1 // Written in the D programming language.
2 /*
3 NYSL Version 0.9982
4 
5 A. This software is "Everyone'sWare". It means:
6   Anybody who has this software can use it as if he/she is
7   the author.
8 
9   A-1. Freeware. No fee is required.
10   A-2. You can freely redistribute this software.
11   A-3. You can freely modify this software. And the source
12       may be used in any software with no limitation.
13   A-4. When you release a modified version to public, you
14       must publish it with your name.
15 
16 B. The author is not responsible for any kind of damages or loss
17   while using or misusing this software, which is distributed
18   "AS IS". No warranty of any kind is expressed or implied.
19   You use AT YOUR OWN RISK.
20 
21 C. Copyrighted to Kazuki KOMATSU
22 
23 D. Above three clauses are applied both to source and binary
24   form of this software.
25 */
26 
27 /**
28 このモジュールは、信号処理のための試験的なモジュールです。
29 このモジュールで定義されているストリームは、Rangeに拡張を加える事で大量のデータを処理することに最適です。
30 また、状況によってはSIMD最適化を狙えるよう設計になっています。
31 */
32 module carbon.stream;
33 
34 import carbon.math,
35        carbon.functional,
36        carbon.nonametype,
37        carbon.simd;
38 
39 import std.algorithm,
40        std.container,
41        std.file,
42        std.functional,
43        std.math,
44        std.range,
45        std.stdio,
46        std.traits,
47        std.typecons;
48 
49 
50 /**
51 入力ストリームは、`read`メソッドに与えられたバッファに要素を格納可能な型です。
52 実行時には、次のような特徴を満たさなければいけません。
53 
54 + `stream.empty`は、そのストリームからノンブロッキングで一要素でも取り出せる場合には`false`となる。
55 + `stream.front`は、そのストリームからノンブロッキングで一要素取り出す。
56 + `stream.popFront`は、そのストリームをノンブロッキングでひとつ進める。
57 + `stream.length`が有効である場合、この値はノンブロッキングで読み出すことができる要素数となる。
58 + `stream.read(buf)`は、ノンブロッキングで処理しなければいけない。
59 + `stream.read(buf)`は、bufのうち先頭から読み込みに成功した要素数だけのスライスを返す。
60     - `stream.length >= buf.length`のとき、常に、`buf.length == stream.read(buf).length`となる。
61 //+ `stream.closed`は、いくら待ってもそれ以上要素が取り出せない状況で`true`となる。
62 //    - `stream.closed`が真の場合、`stream.empty`であり、`lenght`が有効であれば`stream.length == 0`を満たす。
63 + `stream.fetch()`は、`stream.empty`が偽になるまで、
64     もしくはそのストリームからは今後一切データを取り出せないと判断した時点で処理を返す。
65     `stream.fetch()`は、待機後の`stream.empty()`を返す。
66 + `stream`が無限レンジの場合、`stream.read(buf)`は常に`buf`の全要素に読み込む。
67 */
68 enum bool isInputStream(T) = isInputRange!T && is(typeof((T t){
69         alias E = Unqual!(ElementType!T);
70         while(!t.empty || !t.fetch()) E[] var = t.read(new E[1024]);
71     }));
72 
73 
74 /**
75 入力ストリームから、ストリーム終端になるまで可能な限り読み出します。
76 */
77 E[] fillBuffer(S, E)(ref S s, E[] buf)
78 if(isInputStream!S)
79 {
80     E[] rem = buf;
81     while(rem.length && (!s.empty || !s.fetch()))
82         rem = rem[s.read(rem).length .. $];
83     
84     return buf[0 .. $ - rem.length];
85 }
86 
87 
88 /**
89 演算可能なストリームとは、`readOp`メソッドに与えられたバッファ上に、計算した要素を格納可能な入力ストリームです。
90 これは、次のような実行時条件を満たしている必要があります。
91 
92 + `stream.readOp!"op"(buf)`は、`a op= b`という演算をbuf上で行う。
93     つまり、
94     - op : "+"のとき、streamから読み出された値を、bufに加える演算を表す。
95     - op : "-"のとき、streamから読み出された値を、bufから引く演算を表す。
96     + op : ""のときは、`stream.read(buf)`と等価です。
97 + `stream.readOp!func(buf)`は、`func(a, b)`という演算結果をbufに格納する
98     - `stream.readOp!((a, b) => b)(buf)`は、`stream.read(buf)`と等価です。
99     - `stream.readOp!((a, b) => a + b)(buf)`は、`stream.readOp!"+"(buf)`と等価です。
100 */
101 enum bool isInplaceComputableStream(T, alias op = "", U = Unqual!(ElementType!T)) = isInputStream!T &&
102     is(typeof((T t, U[] buf){
103         buf = t.readOp!op(buf);   // as read(buf)
104     }));
105 
106 
107 /**
108 出力ストリームは、`t.write`メソッドに与えられたバッファを出力可能な型です。
109 実行時には次のような条件を満たしている必要があります。
110 
111 + `stream.write(buf)`は、ノンブロッキングでバッファを出力し、まだ出力できていないバッファを返す。
112 + `stream.fill`が偽であれば、一要素でも`stream.write`でノンブロッキング出力可能である
113 + `stream.flush()`は、ブロッキング処理で`t.fill`が偽になるか、
114     もしくはそれ以上出力不可能だと判断した時点で、`t.fill`を返す。
115 */
116 enum bool isOutputStream(T, E) = 
117     is(typeof((T t, E e){
118         E[] writeData = new E[1024];
119         while(writeData.length && (!t.fill || !t.flush())) writeData = t.write(writeData);
120     }));
121 
122 
123 /**
124 */
125 E[] fillBufferOp(alias op, S, E)(ref S s, E[] buffer)
126 {
127     auto rem = buffer;
128     while(rem.length && (!s.empty || !s.fetch()))
129         rem = s.readOp!op(rem);
130     return buffer[0 .. $ - rem.length];
131 }
132 
133 
134 /**
135 内部にバッファを持つような入力ストリームです。
136 
137 + `stream.availables`は内部のバッファを返す。
138 + `stream.consume(size)`は、内部バッファの先頭から`size`要素だけを捨てる。
139 */
140 enum bool isBufferedInputStream(T) = isInputStream!T && 
141     is(typeof((T t){
142         alias E = ElementType!T;
143         const(E)[] buf = t.availables;
144         t.consume(buf.length);
145     }));
146 
147 
148 /**
149 内部にバッファを持つような出力ストリームです。
150 
151 + `stream.buffer`は内部のバッファを返す。
152 + `stream.flush(size)`は、内部バッファの先頭から`size`要素だけ出力する。
153 */
154 enum bool isBufferedOutputStream(T) = isOutputStream!T &&
155     is(typeof((T t){
156         auto buf = t.buffer;
157         static assert(is(typeof(buf) : E[], E));
158         static assert(is(typeof(buf) == Unqual!(typeof(buf))));
159         t.flush(buf.length);
160     }));
161 
162 
163 /**
164 InplaceComputableStreamを作成する際に有用な、std.functional.binaryFunの拡張です
165 */
166 auto ref A binaryFunExt(alias op, A, B)(auto ref A a, auto ref B b)
167 if(is(typeof(mixin(`a` ~ op ~ `=b`))))
168 {
169     mixin(`a` ~ op ~ "=b;");
170     return a;
171 }
172 
173 
174 /// ditto
175 auto ref A binaryFunExt(alias func, A, B)(auto ref A a, auto ref B b)
176 if(is(typeof(a = naryFun!func(a, forward!b))))
177 {
178     a = naryFun!func(a, forward!b);
179     return a;
180 }
181 
182 
183 /**
184 レンジをisInplaceComputableStreamに変換します
185 */
186 auto toStream(R)(R range)
187 if(isInputRange!R && !isInputStream!R)
188 {
189     alias E = Unqual!(ElementType!R);
190 
191     static struct InputStreamRange()
192     {
193         R range;
194         alias range this;
195         alias closed = range.empty;
196         bool fetch(){ return range.empty; }
197 
198         T[] readOp(alias func, T)(T[] buf)
199         {
200             auto p = buf.ptr;
201             const end = () @trusted { return p + buf.length; }();
202             while(p != end && !range.empty){
203                 binaryFunExt!func(*p, range.front);
204                 range.popFront();
205                 () @trusted { ++p; }();
206             }
207 
208             return () @trusted { return buf[0 .. cast(size_t)(p - buf.ptr)]; }();
209         }
210 
211 
212         T[] read(T)(T[] buf) { return readOp!""(buf); }
213 
214         //mixin(defaultStreamSIMDOperator);
215     }
216 
217 
218     return InputStreamRange!()(range);
219 }
220 
221 
222 /**
223 正確なComplexNCO
224 */
225 auto preciseComplexNCO(real freq, real deltaT, real theta = 0) pure nothrow @safe @nogc
226 {
227     static struct PreciseComplexNCO()
228     {
229         creal front() const @property { return this[0]; }
230         void popFront() { _theta += _dt2PI * _freq; }
231         enum bool empty = false;
232         PreciseComplexNCO!() save() const @property { return this; }
233         creal opIndex(size_t i) const @property { return std.math.expi(_theta + i * _dt2PI * _freq); }
234         struct OpDollar{} enum OpDollar opDollar = OpDollar();
235         PreciseComplexNCO!() opSlice() const { return this; }
236         PreciseComplexNCO!() opSlice(size_t i, OpDollar) const { PreciseComplexNCO!() dst = this; dst._theta += i * _dt2PI * _freq; return dst; }
237         auto opSlice(size_t i, size_t j) const { return this[i .. $].take(j - i); }
238 
239         bool fetch() { return false; }
240 
241         T[] readOp(alias op, T)(T[] buf)
242         {
243             auto p = buf.ptr;
244             const end = () @trusted { return p + buf.length; }();
245             while(p != end){
246                 binaryFunExt!op(*p, std.math.expi(_theta));
247                 _theta += _dt2PI * _freq;
248                 () @trusted { ++p; }();
249             }
250 
251             return buf;
252         }
253 
254 
255         Cpx[] read(Cpx)(Cpx[] buf){ return readOp!""(buf); }
256 
257         //mixin(defaultStreamSIMDOperator);
258 
259       @property
260       {
261         real freq() const @property { return _freq; }
262         void freq(real f) @property { _freq = f; }
263 
264         real deltaT() const @property { return _dt2PI / 2 / PI; }
265         void deltaT(real dt) @property { _dt2PI = dt * 2 * PI; }
266       }
267 
268       private:
269         real _freq;
270         real _dt2PI;
271         real _theta;
272     }
273 
274     return PreciseComplexNCO!()(freq, deltaT * 2 * PI, theta);
275 }
276 
277 
278 ///
279 unittest
280 {
281     scope(failure) {writefln("Unittest failure :%s(%s)", __FILE__, __LINE__); stdout.flush();}
282     scope(success) {writefln("Unittest success :%s(%s)", __FILE__, __LINE__); stdout.flush();}
283 
284     auto sig1 = preciseComplexNCO(1000, 0.001, 0);
285     static assert(isInputStream!(typeof(sig1)));
286     static assert(isInplaceComputableStream!(typeof(sig1)));
287     static assert(is(ElementType!(typeof(sig1)) == creal));
288 
289     assert(equal!((a, b) => approxEqual(a.re, b.re) && approxEqual(a.im, b.im))
290                 (sig1[0 .. 4], cast(creal[])[1, 1, 1, 1]));
291 
292     sig1 = preciseComplexNCO(1, 0.25, 0);
293     assert(equal!((a, b) => approxEqual(a.re, b.re) && approxEqual(a.im, b.im))
294                 (sig1[0 .. 4], cast(creal[])[1, 0+1i, -1, -1i]));
295 
296     sig1 = preciseComplexNCO(1000, 10.0L^^-6, std.math.E);
297     auto buf = sig1[0 .. 1024].array;
298     assert(sig1.readOp!"-"(buf).length == buf.length);
299     assert(equal!"a == 0"(buf, buf));
300 
301     sig1.readOp!"a+b"(buf);
302 }
303 
304 
305 
306 /**
307 Lookup-Table方式の高速な局部発振器を提供します。
308 テンプレートパラメータの`func`には周期2PIの周期関数を与えることが出来ます。
309 たとえば、`std.math.expi`を与えれば複素発振器となり、`std.math.sin`であれば正弦波を出力します。
310 
311 この局部発振器は周波数の動的な制御が可能なので、信号の周波数をフィードバック制御する際に用いることができます。
312 Lookup-Tableは、テンプレートパラメータ毎にプログラムの初期化時に生成されるので、
313 最初の初期化が終われば、実行コストはテーブルの参照のみになり、高速にアクセス可能です。
314 また、テーブル長を伸ばしても初期化コストが増加するだけで、実行コストはあまり大きくならないことも特徴です。
315 */
316 template lutNCO(alias func, size_t divN)
317 if(isPowOf2(divN))
318 {
319     alias E = typeof(func(0.0));
320     immutable(E[]) table;
321 
322     shared static this()
323     {
324         E[divN] t;
325         foreach(i, ref e; t)
326             e = func(i * 2 * PI / divN);
327 
328         table = t.dup;
329     }
330 
331 
332     struct LutNCO()
333     {
334         E front() const @property { return table[cast(ptrdiff_t)(_phase) & (divN - 1)]; }
335         void popFront() { _phase += _freq * _deltaT * divN; _phase %= divN; }
336         enum bool empty = false;
337         LutNCO save() const @property { return this; }
338         E opIndex(size_t i) const { return table[cast(ptrdiff_t)(_phase + i * _freq * _deltaT * divN) & (divN - 1)]; }
339         struct OpDollar{} enum opDollar = OpDollar();
340         LutNCO opSlice() const { return this; }
341         LutNCO opSlice(size_t i, OpDollar) const { auto dst = this[]; dst._phase += i * _freq * _deltaT; return dst; }
342         auto opSlice(size_t i, size_t j) const { return this[i .. $].take(j - i); }
343 
344         real phase() const @property { return _phase; }
345         void phase(real p) @property { _phase = p; }
346         real freq() const @property { return _freq; }
347         void freq(real f) @property { _freq = f; }
348         real deltaT() const @property { return _deltaT; }
349         void deltaT(real t) @property { _deltaT = t; }
350 
351         bool fetch() { return false; }
352 
353         T[] readOp(alias op, T)(T[] buf)
354         if(is(typeof(binaryFunExt!op(buf[0], table[0]))))
355         {
356             immutable dph = _freq * _deltaT * divN;
357             auto p = buf.ptr;
358             const end = () @trusted { return p + buf.length; }();
359             while(p != end){
360                 binaryFunExt!op(*p, table[cast(ptrdiff_t)(_phase) & (divN - 1)]);
361                 _phase += dph;
362                 () @trusted { ++p; }();
363             }
364 
365             return buf[0 .. $];
366         }
367 
368         auto readOp(alias op, E, size_t N)(SIMDArray!(E, N) buf)
369         if(is(typeof(binaryFunExt!op(buf[0], _table[0]))))
370         {
371             immutable dph = _freq * _deltaT * divN * N;
372             auto p = buf[].ptr;
373             const end = () @trusted { return p + buf.length; }();
374             while(p != end){
375                 binaryFunExt!op(*p, table[cast(ptrdiff_t)(_phase) & (divN - 1)]);
376                 _phase += dph;
377                 () @trusted { ++p; }();
378             }
379 
380             return buf;
381         }
382 
383         T[] read(T)(T[] buf) @safe { return readOp!""(buf); }
384 
385         auto read(E, size_t N)(SIMDArray!(E, N) buf) { return readOp!""(buf); }
386 
387       private:
388         real _phase;
389         real _freq;
390         real _deltaT;
391     }
392 
393 
394     auto lutNCO(real freq, real deltaT, real theta = 0) pure nothrow @safe @nogc
395     {
396         return .lutNCO(table, freq, deltaT, theta);
397     }
398 }
399 
400 
401 ///
402 unittest{
403     auto sig1 = lutNCO!(std.math.expi, 4)(1, 0.25, 0);
404     static assert(isInputStream!(typeof(sig1)));
405     static assert(isInplaceComputableStream!(typeof(sig1)));
406     static assert(is(ElementType!(typeof(sig1)) : creal));
407 
408     assert(equal!((a, b) => approxEqual(a.re, b.re) && approxEqual(a.im, b.im))
409         (sig1[0 .. 1024], preciseComplexNCO(1, 0.25, 0)[0 .. 1024]));
410 
411     sig1 = lutNCO!(std.math.expi, 4)(1000, 10.0L^^-6, std.math.E);
412     auto buf = sig1[0 .. 1024].array;
413     assert(sig1.readOp!"-"(buf).length == buf.length);
414     assert(equal!"a == 0"(buf, buf));
415 }
416 
417 
418 /**
419 */
420 auto lutNCO(R)(R range, real freq, real deltaT, real theta = 0) pure nothrow @safe @nogc
421 if(isRandomAccessRange!R && hasLength!R)
422 {
423     alias E = Unqual!(std.range.ElementType!R);
424 
425     static struct LutNCO()
426     {
427         E front() @property { return _table[cast(ptrdiff_t)(_phase) % $]; }
428         void popFront() { _phase += _freq * _deltaT * _table.length; _phase %= _table.length; }
429         enum bool empty = false;
430         auto save() @property { return .lutNCO(_table.save, _freq, _deltaT, _phase); }
431         E opIndex(size_t i) { return _table[cast(ptrdiff_t)(_phase + i * _freq * _deltaT * $) % $]; }
432         struct OpDollar{} enum opDollar = OpDollar();
433         auto opSlice() { return .lutNCO(_table.save, _freq, _deltaT, _phase); }
434         auto opSlice(size_t i, OpDollar) { auto dst = this[]; dst._phase += i * _freq * _deltaT; return dst; }
435         auto opSlice(size_t i, size_t j) { return this[i .. $].take(j - i); }
436 
437       static if(is(typeof((const R r, size_t i){auto e = r[i];})))
438       {
439         E front() const @property { return _table[cast(ptrdiff_t)(_phase) % $]; }
440         E opIndex(size_t i) const { return _table[cast(ptrdiff_t)(_phase + i * _freq * _deltaT * $) % $]; }
441       }
442 
443       static if(is(typeof((const R r){r.save();})))
444       {
445         auto save() const @property { return .lutNCO(_table.save, _freq, _deltaT, _phase); }
446         auto opSlice() const { return .lutNCO(_table.save, _freq, _deltaT, _phase); }
447         auto opSlice(size_t i, OpDollar) const { auto dst = this[]; dst._phase += i * _freq * _deltaT; return dst; }
448         auto opSlice(size_t i, size_t j) const { return this[i .. $].take(j - i); }
449       }
450 
451         real phase() const @property { return _phase; }
452         void phase(real p) @property { _phase = p; }
453         real freq() const @property { return _freq; }
454         void freq(real f) @property { _freq = f; }
455         real deltaT() const @property { return _deltaT; }
456         void deltaT(real t) @property { _deltaT = t; }
457 
458         bool fetch() { return false; }
459 
460         T[] readOp(alias op, T)(T[] buf)
461         if(is(typeof(binaryFunExt!op(buf[0], _table[0]))))
462         {
463             immutable dph = _freq * _deltaT * _table.length;
464             auto p = buf.ptr;
465             const end = () @trusted { return p + buf.length; }();
466             while(p != end){
467                 binaryFunExt!op(*p, _table[cast(ptrdiff_t)(_phase) % $]);
468                 _phase += dph;
469                 () @trusted { ++p; }();
470             }
471 
472             return buf[0 .. $];
473         }
474 
475 
476         auto readOp(alias op, E, size_t N)(SIMDArray!(E, N) buf)
477         if(is(typeof(binaryFunExt!op(buf[0], _table[0]))))
478         {
479             immutable divN = _table.length;
480 
481             immutable dph = _freq * _deltaT * divN * N;
482             auto p = buf[].ptr;
483             const end = () @trusted { return p + buf.length; }();
484             while(p != end){
485                 binaryFunExt!op(*p, _table[cast(ptrdiff_t)(_phase) % $]);
486                 _phase += dph;
487                 () @trusted { ++p; }();
488             }
489 
490             return buf;
491         }
492 
493 
494         T[] read(T)(T[] buf) { return readOp!""(buf); }
495 
496         auto read(E, size_t N)(SIMDArray!(E, N) buf) { return readOp!""(buf); }
497 
498         //mixin(defaultStreamSIMDOperator);
499 
500 
501       private:
502         R _table;
503         real _phase;
504         real _freq;
505         real _deltaT;
506     }
507 
508 
509     return LutNCO!()(range, theta, freq, deltaT);
510 }
511 
512 
513 /**
514 永遠とその配列をループし続けるストリーム
515 */
516 auto repeatStream(E)(const E[] array)
517 in{
518     assert(array.length);
519 }
520 body{
521     static struct RepeatStream()
522     {
523         const(E) front() const @property { return _arr[_pos]; }
524         enum empty = false;
525         void popFront() { ++_pos; _pos %= _arr.length; }
526         RepeatStream!() save() const @property { return this; }
527         const(E) opIndex(size_t i) const { return _arr[_pos % $]; }
528         RepeatStream!() opSlice(){ return this; }
529         struct OpDollar{} enum opDollar = OpDollar();
530         RepeatStream!() opSlice(size_t i, OpDollar){ typeof(return) dst; dst._pos = (_pos + i) % _arr.length; return dst; }
531         auto opSlice(size_t i, size_t j){ return this[i .. $].take(j - i); }
532 
533         bool fetch() { return false; }
534 
535         U[] readOp(alias op, U)(U[] buf)
536         {
537             auto rem = buf;
538             while(rem.length){
539                 auto minL = min(rem.length, _arr.length - _pos);
540 
541               static if(is(typeof({ mixin(`rem[]` ~ op ~ `=_arr[];`); })))
542                 mixin(`rem[0 .. minL] `~ op ~ "= _arr[_pos .. _pos + minL];");
543               else{
544                 auto rem_ptr = rem.ptr,
545                      buf_ptr = () @trusted { return _arr.ptr + _pos; }();
546                 const rem_end = () @trusted { return rem_ptr + minL; }();
547 
548                 while(rem_ptr != rem_end){
549                     binaryFunExt!op(*rem_ptr, *buf_ptr);
550                     () @trusted { ++rem_ptr; ++buf_ptr; }();
551                 }
552               }
553               _pos += minL;
554               _pos %= _arr.length;
555               rem = rem[minL .. $];
556             }
557 
558             return buf;
559         }
560 
561 
562         U[] read(U)(U[] buf) { return readOp!""(buf); }
563 
564         //mixin(defaultStreamSIMDOperator);
565 
566 
567         const(E)[] availables() @property { return _arr[_pos .. $]; }
568         void consume(size_t n) {
569             _pos += n;
570             _pos %= _arr.length;
571         }
572 
573       private:
574         const(E)[] _arr;
575         size_t _pos;
576     }
577 
578 
579     return RepeatStream!()(array, 0);
580 }
581 
582 
583 /// ditto
584 auto repeatStream(E)(E element)
585 if(!is(E : T[], T))
586 {
587     static struct RepeatStream()
588     {
589         enum bool empty = false;
590         auto front() const @property { return _e; }
591         void popFront() {}
592         auto save() @property { return this; }
593         auto save() const @property { return .repeatStream(_e); }
594         auto save() immutable @property { return .repeatStream(_e); }
595         auto opIndex(size_t) const @property { return _e; }
596         RepeatStream!() opSlice() const { return this; }
597         struct OpDollar{} enum opDollar = OpDollar();
598         RepeatStream!() opSlice(size_t, OpDollar) const { return this; }
599         auto opSlice(size_t i , size_t j) const { return this.take(j - i); }
600 
601         bool fetch() { return false; }
602 
603 
604         U[] readOp(alias op, U)(U[] buf)
605         {
606           static if(is(typeof({ mixin(`buf[] ` ~ op ~ "= _e;"); })))
607             mixin(`buf[] ` ~ op ~ "= _e;");
608           else
609           {
610             auto p = buf.ptr,
611                  e = () @trusted { return p + buf.length; }();
612 
613             while(p != e){
614                 binaryFunExt!op(*p, _e);
615                 () @trusted { ++p; }();
616             }
617           }
618 
619             return buf;
620         }
621 
622 
623         U[] read(U)(U[] buf)
624         {
625             return readOp!""(buf);
626         }
627 
628         //mixin(defaultStreamSIMDOperator);
629 
630       private:
631         E _e;
632     }
633 
634 
635     return RepeatStream!()(element);
636 }
637 
638 ///
639 unittest{
640     int[] arr = [0, 1, 2, 3, 4, 5, 6, 7];
641     auto rs = arr.repeatStream;
642     static assert(isInfinite!(typeof(rs)));
643     static assert(isInputStream!(typeof(rs)));
644     static assert(isInplaceComputableStream!(typeof(rs)));
645     static assert(isBufferedInputStream!(typeof(rs)));
646 
647 
648     int[] buf1 = new int[24];
649     assert(rs.readOp!"+"(buf1) == arr ~ arr ~ arr);
650 
651     short[] buf2 = new short[17];
652     assert(rs.readOp!"cast(short)b"(buf2) == buf1[0 .. 17]);
653 }
654 
655 ///
656 unittest{
657     auto rs = 1.repeatStream;
658     static assert(isInfinite!(typeof(rs)));
659     static assert(isInputStream!(typeof(rs)));
660     static assert(isInplaceComputableStream!(typeof(rs)));
661 
662     assert(rs.read(new int[4]) == [1, 1, 1, 1]);
663 }
664 
665 
666 /**
667 一度に巨大なファイルを読み込むことに特化した,バッファ持ち入力ストリームです。
668 */
669 auto rawFileStream(T)(string filename, size_t bufferSize = 1024 * 1024)
670 if(is(Unqual!T == T))
671 {
672     static struct RawFileStream()
673     {
674         private void refill()
675         {
676             _pos = 0;
677             if(!_file.isOpen){
678                 _buffer = null;
679                 return;
680             }
681 
682             _pos = 0;
683             immutable beforeSize = _buffer.length;
684             _buffer = _file.rawRead(_buffer);
685             if(_buffer.length != beforeSize)
686                 _file.detach();
687         }
688 
689 
690         this(File file, T[] buf)
691         {
692             _file = file;
693             _buffer = buf;
694             _pos = 0;
695         }
696 
697 
698         /** 入力レンジのプリミティブ
699         */
700         T front() const @property { return _buffer[_pos]; }
701         bool empty() const @property { return _buffer.length == _pos; }     /// ditto
702         void popFront() { ++_pos; }                                         /// ditto
703         size_t length() const @property { return _buffer.length - _pos; }   /// ditto
704 
705 
706         /**
707         入力ストリームのプリミティブ
708         */
709         bool fetch()
710         {
711             if(!this.empty) return false;
712             refill();
713             return this.empty;
714         }
715 
716 
717         /// ditto
718         E[] readOp(alias func, E)(E[] buf)
719         if(is(typeof(binaryFunExt!func(buf[0], _buffer[0]))))
720         {
721             immutable minL = min(buf.length, _buffer.length - _pos);
722           static if(is(typeof(mixin(`buf[]` ~ func ~ `= _buffer[]`))))
723             mixin(`buf[0 .. minL] ` ~ func ~ "= _buffer[_pos .. _pos + minL];");
724           else
725           {
726             auto rem_ptr = buf.ptr,
727                  buf_ptr = () @trusted { return _buffer.ptr + _pos; }();
728             const rem_end = () @trusted { return rem_ptr + minL; }();
729 
730             while(rem_ptr != rem_end){
731                 binaryFunExt!func(*rem_ptr, *buf_ptr);
732                 () @trusted { ++rem_ptr; ++buf_ptr; }();
733             }
734           }
735             _pos += minL;
736             return buf[0 .. minL];
737         }
738 
739 
740         /// ditto
741         E[] read(E)(E[] buf)
742         //if(isAssignable!(E, T))
743         { return readOp!""(buf); }
744 
745 
746         //mixin(defaultStreamSIMDOperator);
747 
748 
749         /**
750         バッファ持ち入力レンジのプリミティブ
751         */
752         const(T)[] availables() const @property { return _buffer[_pos .. $]; }
753 
754 
755         /**
756         バッファ持ち入力レンジのプリミティブ
757         */
758         void consume(size_t n)
759         {
760             while(n && !this.empty){
761                 if(n >= _buffer.length - _pos){
762                     n -= _buffer.length - _pos;
763                     if(_file.isOpen)
764                         refill();
765                     else{
766                         _buffer = null;
767                         _pos = 0;
768                     }
769                 }
770                 else{
771                     _pos += n;
772                     n = 0;
773                 }
774             }
775         }
776 
777 
778       private:
779         T[] _buffer;
780         size_t _pos;
781         File _file;
782     }
783 
784 
785     auto dst = RefCounted!(RawFileStream!())(File(filename), new T[bufferSize]);
786     dst.refill();
787     return dst;
788 }
789 
790 ///
791 unittest{
792     immutable fname = "testData.dat";
793     std.file.write(fname, cast(ubyte[])[0, 1, 2, 3, 4, 5]); // 6byte
794     scope(exit)
795         std.file.remove(fname);
796 
797     auto sig1 = rawFileStream!int(fname);
798     static assert(isInputStream!(typeof(sig1)));
799     static assert(isInplaceComputableStream!(typeof(sig1)));
800     static assert(isBufferedInputStream!(typeof(sig1)));
801 
802     assert(sig1.availables.length == 1);
803     sig1.consume(1);
804     assert(sig1.empty && sig1.fetch());
805 }
806 
807 ///
808 unittest{
809     immutable fname = "testData.dat";
810     std.file.write(fname, [0, 1, 2, 3, 4, 5]); // 24byte
811     scope(exit)
812         std.file.remove(fname);
813 
814     auto sig1 = rawFileStream!int(fname);
815     assert(sig1.availables == [0, 1, 2, 3, 4, 5]);
816     sig1.consume(6);
817     assert(sig1.empty && sig1.fetch());
818 }
819 
820 
821 /**
822 二つの信号の積を返します。
823 2つ目の信号は演算による理想信号でなければいけません。
824 */
825 auto mixer(Sg1, Sg2)(Sg1 sg1, Sg2 sg2)
826 if(isInputStream!Sg1 && isInplaceComputableStream!(Sg2, "*") && isInfinite!Sg2)
827 {
828     static struct Mixer()
829     {
830         auto ref front() const @property { return _sg1.front * _sg2.front; }
831 
832       static if(isInfinite!Sg1)
833         enum bool empty = false;
834       else
835         bool empty() const @property { return _sg1.empty; }
836 
837         void popFront() { _sg1.popFront(); _sg2.popFront(); }
838 
839         bool fetch()
840         {
841           static if(isInfinite!Sg1)
842             return false;
843           else
844           {
845             if(!_sg1.empty) return false;
846             //_sg2.fetch();
847             return _sg1.fetch();
848           }
849         }
850 
851 
852         E[] read(E)(E[] buf)
853         {
854             return readOp!""(buf);
855         }
856 
857 
858         auto read(E, size_t N)(SIMDArray!(E, N) buf)
859         {
860             return readOp!""(buf);
861         }
862 
863 
864         E[] readOp(alias op, E)(E[] buf)
865         if((op == "*" || op == "/")
866         && isInplaceComputableStream!(Sg1, op)
867         && isInplaceComputableStream!(Sg2, op))
868         {
869             return _sg2.readOp!op(_sg1.readOp!op(buf));
870         }
871 
872 
873         E[] readOp(alias op : "", E)(E[] buf)
874         {
875             return _sg2.readOp!"*"(_sg1.read(buf));
876         }
877 
878 
879         auto readOp(alias op, E, size_t N)(SIMDArray!(E, N) buf)
880         if((op == "*" || op == "/")
881         && isInplaceComputableStream!(Sg1, op)
882         && isInplaceComputableStream!(Sg2, op))
883         {
884             return _sg2.readOp!op(_sg1.readOp!op(buf));
885         }
886 
887 
888         auto readOp(alias op : "", E, size_t N)(SIMDArray!(E, N) buf)
889         {
890             return _sg2.readOp!"*"(_sg1.read(buf));
891         }
892 
893 
894       private:
895         Sg1 _sg1;
896         Sg2 _sg2;
897     }
898 
899 
900     return Mixer!()(sg1, sg2);
901 }
902 
903 
904 ///
905 unittest
906 {
907     scope(failure) {writefln("Unittest failure :%s(%s)", __FILE__, __LINE__); stdout.flush();}
908     scope(success) {writefln("Unittest success :%s(%s)", __FILE__, __LINE__); stdout.flush();}
909 
910     auto arr1 = [0, 1, 0, 1, 0, 1, 0, 1].repeatStream;
911     auto arr2 = [0, 0, 1, 1, 0, 0, 1, 1].repeatStream;
912     auto mx1 = arr1.mixer(arr2);
913     static assert(isInfinite!(typeof(mx1)));
914     static assert(isInputStream!(typeof(mx1)));
915 
916     int[] buf1 = new int[16];
917     assert(mx1.read(buf1) == [0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1]);
918 
919     auto arr3 = [0, 0, 0, 0, 1, 1, 1, 1].repeatStream;
920     auto mx2 = mx1.mixer(arr3);
921 
922     int[] buf2 = new int[16];
923     assert(mx2.read(buf2) == [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1]);
924 }
925 
926 
927 /**
928 二つの信号を足します。
929 2つ目の信号は、演算により生成された理想信号でなければいけません。
930 */
931 auto adder(Sg1, Sg2)(Sg1 sg1, Sg2 sg2)
932 if(isInputStream!Sg1 && isInplaceComputableStream!(Sg2, "+") && isInfinite!Sg2)
933 {
934     static struct Adder()
935     {
936         auto ref front() const @property { return _sg1.front + _sg2.front; }
937 
938       static if(isInfinite!Sg1)
939         enum bool empty = false;
940       else
941         bool empty() const @property { return _sg1.empty; }
942 
943         void popFront() { _sg1.popFront(); _sg2.popFront(); }
944 
945         bool fetch() {
946           static if(isInfinite!Sg1)
947             return false;
948           else
949           {
950             if(!_sg1.empty) return false;
951             //_sg2.fetch();
952             return _sg1.fetch();
953           }
954         }
955 
956         E[] read(E)(E[] buf)
957         {
958             return readOp!""(buf);
959         }
960 
961 
962         E[] readOp(alias op, E)(E[] buf)
963         if((op == "+" || op == "-")
964         && isInplaceComputableStream!(Sg1, op)
965         && isInplaceComputableStream!(Sg2, op))
966         {
967             return _sg2.readOp!op(_sg1.readOp!op(buf));
968         }
969 
970 
971         auto readOp(alias op, E, size_t N)(SIMDArray!(E, N) buf)
972         {
973             return _sg2.readOp!op(_sg1.readOp!op(buf));
974         }
975 
976 
977         E[] readOp(alias op : "", E)(E[] buf)
978         {
979             return _sg2.readOp!"+"(_sg1.read(buf));
980         }
981 
982 
983         auto readOp(alias op, E, size_t N)(SIMDArray!(E, N) buf)
984         {
985             return _sg2.readOp!"+"(_sg1.readOp(buf));
986         }
987 
988 
989       private:
990         Sg1 _sg1;
991         Sg2 _sg2;
992     }
993 
994 
995     return Adder!()(sg1, sg2);
996 }
997 
998 
999 ///
1000 unittest
1001 {
1002     scope(failure) {writefln("Unittest failure :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1003     scope(success) {writefln("Unittest success :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1004 
1005     auto arr1 = [0, 1, 0, 1, 0, 1, 0, 1].repeatStream;
1006     auto arr2 = [0, 0, 1, 1, 0, 0, 1, 1].repeatStream;
1007     auto mx1 = arr1.adder(arr2);
1008     static assert(isInfinite!(typeof(mx1)));
1009     static assert(isInputStream!(typeof(mx1)));
1010 
1011     int[] buf1 = new int[16];
1012     assert(mx1.read(buf1) == [0, 1, 1, 2, 0, 1, 1, 2, 0, 1, 1, 2, 0, 1, 1, 2]);
1013 }
1014 
1015 ///
1016 unittest
1017 {
1018     scope(failure) {writefln("Unittest failure :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1019     scope(success) {writefln("Unittest success :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1020 
1021     auto arr1 = [0, 1, 0, 1, 0, 1, 0, 1].repeatStream;
1022     auto buf1 = arr1.adder(arr1).adder(arr1).read(new int[16]);
1023     auto buf2 = arr1.amplifier(3).read(new int[16]);
1024 
1025     assert(buf1 == buf2);
1026 }
1027 
1028 
1029 /**
1030 積分器です。
1031 積分するサンプル量が大きい場合の使用に適しています。
1032 */
1033 auto accumulator(E)(size_t integN)
1034 {
1035     static struct Accumulator
1036     {
1037         E[] write(E[] buf)
1038         {
1039             while(buf.length){
1040                 immutable n = min(_integN - _cnt, buf.length);
1041                 auto ptr = buf.ptr;
1042                 const end = buf.ptr + n;
1043                 auto val = _buffer.back;
1044                 while(ptr != end){
1045                     val += *ptr;
1046                     ++ptr;
1047                 }
1048 
1049                 _cnt += n;
1050                 _buffer.back = val;
1051                 buf = buf[n .. $];
1052                 if(_cnt == _integN){
1053                     _buffer ~= cast(E)0;
1054                     _cnt = 0;
1055                     ++_avaN;
1056                 }
1057             }
1058 
1059             return buf;
1060         }
1061 
1062 
1063         enum bool fill = false;
1064         bool flush() { return false; }
1065 
1066 
1067         auto opSlice(){ return _buffer[].take(_avaN); }
1068 
1069         E front() const @property { return _buffer.front; }
1070         void popFront() { _buffer.removeFront(); --_avaN; }
1071         bool empty() const @property { return _avaN == 0; }
1072 
1073 
1074       private:
1075         DList!E _buffer;
1076         size_t _cnt;
1077         size_t _integN;
1078         size_t _avaN;
1079     }
1080 
1081 
1082     return Accumulator(DList!E(cast(E)0), 0, integN, 0);
1083 }
1084 
1085 ///
1086 unittest
1087 {
1088     scope(failure) {writefln("Unittest failure :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1089     scope(success) {writefln("Unittest success :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1090 
1091     auto acc = accumulator!int(4);
1092     static assert(isOutputStream!(typeof(acc), int));
1093 
1094     assert(acc.write([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]).empty);
1095     assert(equal(acc[], [0+1+2+3, 4+5+6+7]));
1096 
1097     acc.write([10, 11]);
1098     assert(equal(acc[], [0+1+2+3, 4+5+6+7, 8+9+10+11]));
1099 }
1100 
1101 
1102 /**
1103 積分器です。
1104 たとえば、連続する少数の個数のサンプルの和を取るような用途に適しています。
1105 */
1106 auto accumulator(size_t N, Sg)(Sg sg, size_t bufSize = 1024 * 1024)
1107 {
1108     alias E = Unqual!(ElementType!Sg);
1109     return accumulator(sg, new E[bufSize]);
1110 }
1111 
1112 
1113 /// ditto
1114 auto accumulator(size_t N, Sg, E)(Sg sg, E[] buffer)
1115 if(isInputStream!Sg && is(ElementType!Sg : E))
1116 {
1117     static struct Accumulator()
1118     {
1119         auto front() const @property { return availables[0]; }
1120         void popFront() { consume(1); }
1121         bool empty() const @property { return _pos == _end; }
1122 
1123         bool fetch()
1124         {
1125             if(!this.empty) return false;
1126             auto p = _buffer.ptr + _buffer.length - _remN,
1127                  f = _buffer.ptr;
1128             const e = _buffer.ptr + _buffer.length;
1129             while(p != e) { *f = *p; ++f; ++p; }
1130             _pos = 0;
1131             _end = 0;
1132 
1133             while(this.empty && (!_sg.empty || !_sg.fetch())){
1134                 auto buf = _sg.read(_buffer[_remN .. $]);
1135                 immutable size_t len = (_remN + buf.length) / N;
1136                 _end += len;
1137 
1138                 auto pp = _buffer.ptr,
1139                      ff = _buffer.ptr;
1140                 auto ee = _buffer.ptr + _end;
1141 
1142                 while(pp != ee){
1143                     size_t s = 1;
1144                     if(pp != _buffer.ptr) { *pp = 0; s = 0; }
1145 
1146                     ff += s;
1147                     foreach(i; s .. N){
1148                         *pp += *ff;
1149                         ++ff;
1150                     }
1151 
1152                     ++pp;
1153                 }
1154                 _remN = (_remN + buf.length) - _end * N;
1155             }
1156 
1157             return this.empty;
1158         }
1159 
1160 
1161         T[] readOp(alias func, T)(T[] buf)
1162         {
1163             immutable minL = min(buf.length, _end - _pos);
1164           static if(is(typeof(mixin(`buf[]` ~ func ~ `= _buffer[]`))))
1165             mixin(`buf[0 .. minL] ` ~ func ~ "= _buffer[_pos .. _pos + minL];");
1166           else
1167           {
1168             auto rem_ptr = buf.ptr,
1169                  buf_ptr = () @trusted { return _buffer.ptr + _pos; }();
1170             const rem_end = () @trusted { return rem_ptr + minL; }();
1171 
1172             while(rem_ptr != rem_end){
1173                 binaryFunExt!func(*rem_ptr, *buf_ptr);
1174                 () @trusted { ++rem_ptr; ++buf_ptr; }();
1175             }
1176           }
1177             _pos += minL;
1178             return buf[0 .. minL];
1179         }
1180 
1181 
1182         auto readOp(alias func, E, size_t N)(SIMDArray!(E, N) buf)
1183         {
1184             auto b = readOp(buf.array);
1185             return buf[0 .. b.length];
1186         }
1187 
1188 
1189         T[] read(T)(T[] buf) { return readOp!""(buf); }
1190 
1191         //mixin(defaultStreamSIMDOperator);
1192 
1193 
1194         const(E)[] availables() const @property { return _buffer[_pos .. _end]; }
1195         void consume(size_t n)
1196         {
1197             while(n && (!this.empty || !this.fetch())){
1198                 if(n <= _end - _pos){
1199                     _pos += n;
1200                     n = 0;
1201                 }else{
1202                     n -= _end - _pos;
1203                     _pos = _end;
1204                 }
1205             }
1206         }
1207 
1208 
1209       private:
1210         Sg _sg;
1211         E[] _buffer;
1212         size_t _pos;
1213         size_t _end;
1214         size_t _remN;
1215     }
1216 
1217 
1218     return Accumulator!()(sg, buffer, 0, 0, 0);
1219 }
1220 
1221 ///
1222 unittest
1223 {
1224     scope(failure) {writefln("Unittest failure :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1225     scope(success) {writefln("Unittest success :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1226 
1227     auto arr1 = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9].repeatStream;
1228     auto acc = accumulator!4(arr1, new int[14]);
1229     static assert(isBufferedInputStream!(typeof(acc)));
1230 
1231     int[] buf = new int[3];
1232     assert(!acc.empty || !acc.fetch());
1233     assert(acc.read(buf) == [0+1+2+3, 4+5+6+7, 8+9+0+1]);
1234     assert(!acc.empty || !acc.fetch());
1235     assert(acc.read(buf) == [2+3+4+5, 6+7+8+9, 0+1+2+3]);
1236 }
1237 
1238 /+ ベンチマーク, rangeの3倍程度の性能
1239 unittest {
1240     static real[] vector;
1241     vector = new real[1024 * 1024];
1242     foreach(ref e; vector) e = 0;
1243 
1244 
1245     void funcStream()
1246     {
1247         alias sin128LutNCO = lutNCO!(std.math.sin, 128);
1248 
1249         auto mx = mixer(sin128LutNCO(6.5E6L, 1E-6L, 0),
1250                         sin128LutNCO(6.6E6L, 1E-6L, 0));
1251 
1252         mx.readOp!"*"(vector);
1253     }
1254 
1255 
1256     void funcRange()
1257     {
1258         alias sin128LutNCO = lutNCO!(std.math.sin, 128);
1259 
1260         auto mx = zip(sin128LutNCO(6.5E6L, 1E-6L, 0), sin128LutNCO(6.6E6L, 1E-6L, 0))
1261                  .map!"a[0]*a[1]";
1262 
1263         auto ptr = vector.ptr, end = ptr + vector.length;
1264         while(ptr != end){
1265             *ptr *= mx.front;
1266             mx.popFront();
1267             ++ptr;
1268         }
1269     }
1270 
1271     
1272     import std.datetime;
1273     import std.container;
1274     auto ts = benchmark!(funcStream, funcRange)(26);
1275 
1276     writefln("%(\n%s%)", ts[].map!"a.usecs");
1277 }
1278 +/
1279 
1280 
1281 auto amplifier(Sg, F)(Sg sg, F gain)
1282 {
1283     static struct Amplifier()
1284     {
1285         auto front() const @property { return _sg.front * _gain; }
1286         void popFront() { _sg.popFront(); }
1287         bool empty() const { return _sg.empty; }
1288         bool fetch() { return _sg.fetch(); }
1289 
1290 
1291         E[] readOp(string op, E)(E[] buf)
1292         if((op == "*" || op == "/") && isInplaceComputableStream!(Sg, op, E))
1293         {
1294             buf = _sg.readOp!op(buf);
1295             mixin("buf[]" ~ op ~ "= _gain;");
1296 
1297             return buf;
1298         }
1299 
1300 
1301         auto readOp(string op, E, size_t N)(SIMDArray!(E, N) buf)
1302         if((op == "*" || op == "/") && isInplaceComputableStream!(Sg, op, E))
1303         {
1304             buf = _sg.readOp!op(buf);
1305             mixin("buf" ~ op ~ "= _gain;");
1306 
1307             return buf;
1308         }
1309 
1310 
1311         E[] read(E)(E[] buf)
1312         {
1313             buf = _sg.read(buf);
1314             buf[] *= _gain;
1315 
1316             return buf;
1317         }
1318 
1319 
1320         auto read(E, size_t N)(SIMDArray!(E, N) buf)
1321         {
1322             buf = _sg.read(buf);
1323             buf *= _gain;
1324         }
1325 
1326 
1327         F gain() const @property pure nothrow @safe @nogc { return _gain; }
1328         void gain(F g) @property pure nothrow @safe @nogc { _gain = g; }
1329 
1330 
1331       private:
1332         Sg _sg;
1333         F _gain;
1334     }
1335 
1336 
1337     return Amplifier!()(sg, gain);
1338 }
1339 
1340 ///
1341 unittest
1342 {
1343     scope(failure) {writefln("Unittest failure :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1344     scope(success) {writefln("Unittest success :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1345 
1346     auto arr4 = [0, 1, 2, 3].repeatStream,
1347          amped = arr4.amplifier(4);
1348 
1349     assert(amped.read(new int[4]) == [0, 4, 8, 12]);
1350     static assert(isInplaceComputableStream!(typeof(arr4)));
1351     static assert(isInplaceComputableStream!(typeof(amped), "*"));
1352     static assert(isInplaceComputableStream!(typeof(amped), "/"));
1353 
1354     auto arr = [3, 4, 1, 2];
1355     assert(amped.readOp!"*"(arr) == [0, 16, 8, 24]);
1356 }
1357 
1358 
1359 auto selector(Sg...)(Sg sgs)
1360 {
1361     static struct Selector()
1362     {
1363         import std..string : format;
1364 
1365 
1366         auto front() const @property
1367         {
1368             switch(_selectIndex){
1369               foreach(i, E; Sg)
1370                 mixin(format("case %s: return _sgs[%s].front;", i, i));
1371               default: assert(0);
1372             }
1373         }
1374 
1375 
1376         void popFront()
1377         {
1378             switch(_selectIndex){
1379               foreach(i, E; Sg)
1380                 mixin(format("case %s: _sgs[%s].popFront();", i, i));
1381               default: assert(0);
1382             }
1383         }
1384 
1385 
1386         bool empty() const @property
1387         {
1388             switch(_selectIndex){
1389               foreach(i, E; Sg)
1390                 mixin(format("case %s: return _sgs[%s].empty;", i, i));
1391               default: assert(0);
1392             }
1393         }
1394 
1395 
1396         bool fetch()
1397         {
1398             switch(_selectIndex){
1399               foreach(i, E; Sg)
1400                 mixin(format("case %s: return _sgs[%s].fetch();", i, i));
1401               default: assert(0);
1402             }
1403         }
1404 
1405 
1406         E[] read(E)(E[] buf)
1407         {
1408             switch(_selectIndex){
1409               foreach(i, E; Sg)
1410                 mixin(format("case %s: return _sgs[%s].read(buf);", i, i));
1411               default: assert(0);
1412             }
1413         }
1414 
1415 
1416         E[] readOp(alias op, E)(E[] buf)
1417         {
1418             switch(_selectIndex){
1419               foreach(i, E; Sg)
1420                 mixin(format("case %s: return _sgs[%s].readOp!op(buf);", i, i));
1421               default: assert(0);
1422             }
1423         }
1424 
1425 
1426         //mixin(defaultStreamSIMDOperator);
1427 
1428 
1429         void select(size_t i)
1430         in{
1431             assert(i < Sg.length);
1432         }
1433         body{
1434             _selectIndex = i;
1435         }
1436 
1437 
1438         void select(size_t i)()
1439         {
1440             static assert(i < Sg.length);
1441             _selectIndex = i;
1442         }
1443 
1444 
1445       private:
1446         Sg _sgs;
1447         size_t _selectIndex;
1448     }
1449 
1450 
1451     return Selector!()(sgs, 0);
1452 }
1453 
1454 ///
1455 unittest
1456 {
1457     scope(failure) {writefln("Unittest failure :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1458     scope(success) {writefln("Unittest success :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1459 
1460     auto arr1 = 0.repeatStream,
1461          arr2 = [0, 1].repeatStream,
1462          arr3 = [0, 1, 2].repeatStream,
1463          arr4 = [0, 1, 2, 3].repeatStream;
1464 
1465     auto slt = selector(arr1, arr2, arr3, arr4);
1466     assert(slt.read(new int[4]) == [0, 0, 0, 0]);
1467     slt.select(1); // or slt.select!1;
1468     assert(slt.read(new int[4]) == [0, 1, 0, 1]);
1469     slt.select(2); // or slt.select!2;
1470     assert(slt.read(new int[4]) == [0, 1, 2, 0]);
1471     slt.select(3); // or slt.select!3;
1472     assert(slt.read(new int[4]) == [0, 1, 2, 3]);
1473 }
1474 
1475 
1476 /**
1477 FIRフィルタを構成します。
1478 
1479 FIRフィルタの一般形は、z変換すれば次のような式で表すことができます。
1480 H[z] = Σ(k[m]*z^(-m))
1481 
1482 この関数には、各タップの係数`k[m]`を指定することで任意のFIRフィルタを構築することができます。
1483 */
1484 auto firFilter(alias reduceFn = "a+b*c", Sg, E)(Sg sg, const E[] taps)
1485 if(isInputStream!Sg)
1486 {
1487     return firFilter!(reduceFn, Sg, Unqual!E)(sg, taps, new Unqual!E[](taps.length));
1488 }
1489 
1490 
1491 ///
1492 auto firFilter(alias reduceFn = "a+b*c", Sg, E)(Sg sg, const E[] tap, E[] buf)
1493 if(isInputStream!Sg && is(E == Unqual!E))
1494 in{
1495     assert(tap.length == buf.length);
1496 }
1497 body{
1498     static struct FIRFiltered()
1499     {
1500         E front() const @property @trusted
1501         {
1502             E a = 0;
1503 
1504             {
1505                 auto p1 = _tap[$ - 1 .. $].ptr,
1506                      p2 = _buf[_idx .. $].ptr,
1507                      e2 = _buf[$ .. $].ptr;
1508 
1509                 while(p2 != e2){
1510                     a = naryFun!reduceFn(a, *p2, *p1);
1511                     --p1;
1512                     ++p2;
1513                 }
1514             }
1515 
1516             {
1517                 auto p1 = _tap.ptr + _idx-1,
1518                      p2 = _buf[0 .. _idx].ptr,
1519                      e2 = _buf[_idx .. $].ptr;
1520 
1521                 while(p2 != e2){
1522                     a = naryFun!reduceFn(a, *p2, *p1);
1523                     --p1;
1524                     ++p2;
1525                 }
1526             }
1527 
1528             return a;
1529         }
1530 
1531 
1532         void popFront()
1533         {
1534             _buf[_idx] = _sg.front;
1535             ++_idx;
1536             _idx %= _buf.length;
1537             _sg.popFront();
1538         }
1539 
1540       static if(isInfinite!Sg)
1541         enum bool empty = false;
1542       else
1543         bool empty() const @property { return _sg.empty; }
1544 
1545 
1546         bool fetch() { return _sg.fetch(); }
1547 
1548 
1549         U[] readOp(alias op, U)(U[] buf)
1550         {
1551             auto p = buf.ptr,
1552                  e = buf[$ .. $].ptr;
1553 
1554             while(p != e && !this.empty){
1555                 binaryFunExt!op(*p, this.front);
1556                 this.popFront();
1557                 () @trusted { ++p; }();
1558             }
1559 
1560             return () @trusted { return buf[0 .. p - buf.ptr]; }();
1561         }
1562 
1563 
1564         U[] read(U)(U[] buf)
1565         {
1566             return readOp!""(buf);
1567         }
1568 
1569 
1570         //mixin(defaultStreamSIMDOperator);
1571 
1572 
1573       private:
1574         Sg _sg;
1575         const(E)[] _tap;
1576         E[] _buf;
1577         size_t _idx;
1578     }
1579 
1580 
1581     return FIRFiltered!()(sg, tap, buf, 0);
1582 }
1583 
1584 ///
1585 unittest
1586 {
1587     scope(failure) {writefln("Unittest failure :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1588     scope(success) {writefln("Unittest success :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1589 
1590     auto arr = [0, 1, 2, 3].repeatStream,
1591          flt1 = arr.firFilter([0, 1]);
1592 
1593     assert(flt1.read(new int[8]) == [0, 0, 0*1+1*0, 1, 2, 3, 0, 1]);
1594 
1595     auto flt2 = arr.firFilter([1, 2]);
1596     assert(flt2.read(new int[6]) == [0, 0, 0*2+1*1, 1*2+2*1, 2*2+3*1, 3*2+0*1]);
1597 }
1598 
1599 
1600 /*
1601 IIRフィルタを構成します。
1602 
1603 IIRフィルタの一般形は、z変換すれば次のような式で表すことができます。
1604 H(z) = 1/Σ(k[m]*z^(-m))
1605 
1606 この関数には、各タップの係数`k[m]`を指定することで任意のIIRフィルタを構築することができます。
1607 */
1608 //auto iirFilter(alias reduceFn = "a+b*c", Sg, E)(Sg sg, const E[] taps)
1609 //if(isInputStream!Sg)
1610 //{
1611 //    return iirFilter!(reduceFn, Sg, Unqual!E)(sg, taps, new Unqual!E()(taps.length));
1612 //}
1613 
1614 /////
1615 //auto iirFilter(alias reduceFn = "a+b*c", Sg, E)(Sg sg, const E[] taps, E[] buf)
1616 //if(isInputStream!Sg && is(E == Unqual!E))
1617 //{
1618 
1619 //}
1620 
1621 
1622 /**
1623 信号の絶対値の最大値を`limit`にするように線形に振幅を小さく、もしくは大きくします。
1624 */
1625 auto normalizer(Sg, E)(Sg sg, E limit)
1626 if(isFloatingPoint!E)
1627 {
1628     static struct Normalizer()
1629     {
1630         E front() @property
1631         {
1632             auto f = _sg.front,
1633                  a = abs(f);
1634 
1635             if(f == 0) return 0;
1636 
1637             if(a > _max){
1638                 _max = a;
1639 
1640                 return _lim;
1641             }
1642 
1643             return f / _max * _lim;
1644         }
1645 
1646 
1647         auto empty() const @property { return _sg.empty; }
1648         void popFront() { _sg.popFront(); }
1649 
1650         bool fetch() { return _sg.fetch(); }
1651 
1652 
1653         U[] read(U)(U[] buf)
1654         {
1655             buf = _sg.read(buf);
1656 
1657             auto p = buf.ptr,
1658                  e = buf[$ .. $].ptr;
1659 
1660             while(p != e){
1661                 immutable v = abs(*p);
1662                 if(v == 0)
1663                     *p = 0;
1664                 else{
1665                     if(v > _max)
1666                         _max = v;
1667 
1668                     *p *= _lim / _max;
1669                 }
1670 
1671                 () @trusted { ++p; }();
1672             }
1673 
1674             return buf;
1675         }
1676 
1677 
1678         U[] readOp(alias op : "", U)(U[] buf)
1679         {
1680             return this.read(buf);
1681         }
1682 
1683 
1684         E[] readOp(alias op, E)(E[] buf)
1685         {
1686             auto p = buf.ptr,
1687                  e = buf[$ .. $].ptr;
1688 
1689             while(p != e && !this.empty){
1690                 binaryFunExt!op(*p, this.front);
1691                 this.popFront();
1692                 () @trusted { ++p; }();
1693             }
1694 
1695             return buf;
1696         }
1697 
1698         //mixin(defaultStreamSIMDOperator);
1699 
1700 
1701       private:
1702         Sg _sg;
1703         E _lim;
1704         E _max;
1705     }
1706 
1707     return Normalizer!()(sg, limit, 0);
1708 }
1709 
1710 ///
1711 unittest
1712 {
1713     scope(failure) {writefln("Unittest failure :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1714     scope(success) {writefln("Unittest success :%s(%s)", __FILE__, __LINE__); stdout.flush();}
1715 
1716     auto arr = [0, 1, -2, 3].repeatStream,
1717          nlz = arr.normalizer(1.5);
1718 
1719     assert(equal!approxEqual(nlz.read(new float[8]), [0, 1.5, -1.5, 1.5, 0, 0.5, -1.0, 1.5]));
1720 }
1721 
1722 
1723 
1724 /**
1725 信号をマッピングします
1726 */
1727 auto mapper(alias fn, Sg)(Sg sg, ElementType!Sg[] buf = null, size_t bufSize = 1024)
1728 if(isInputStream!Sg && is(typeof({sg.read(buf);})))
1729 {
1730     if(buf is null)
1731         buf.length = bufSize;
1732 
1733     static struct Result
1734     {
1735         alias E = ElementType!Sg;
1736 
1737         auto front() @property { return unaryFun!fn(_sg.front); }
1738         bool empty() const @property { return _sg.empty; }
1739         void popFront() { _sg.popFront(); }
1740 
1741         bool fetch() { return _sg.fetch(); }
1742 
1743         U[] readOp(alias op, U)(U[] outbuf)
1744         {
1745             if(_buf.length < outbuf.length)
1746                 _buf.length = outbuf.length;
1747 
1748             auto ib = _buf[0 .. outbuf.length];
1749             ib = _sg.read(ib);
1750 
1751             auto p = ib.ptr,
1752                  e = ib[$ .. $].ptr,
1753                  q = outbuf.ptr;
1754 
1755             while(p != e){
1756                 binaryFunExt!op(*q, unaryFun!fn(*p));
1757                 () @trusted { ++p; ++q; }();
1758             }
1759 
1760             return outbuf[0 .. ib.length];
1761         }
1762 
1763 
1764         U[] read(U)(U[] buf)
1765         {
1766             return readOp!""(buf);
1767         }
1768 
1769         //mixin(defaultStreamSIMDOperator);
1770 
1771 
1772       private:
1773         Sg _sg;
1774         E[] _buf;
1775     }
1776 
1777     return Result(sg, buf);
1778 }
1779 
1780 
1781 /**
1782 
1783 */
1784 auto arrayMapper(Sg, E)(Sg sg, in E[] arr, size_t[] buf = null, size_t bufSize = 1024)
1785 if(isInputStream!Sg && is(ElementType!Sg : size_t))
1786 {
1787     if(buf is null)
1788         buf.length = bufSize;
1789 
1790     static struct Result
1791     {
1792         auto front() @property { return _arr[_sg.front]; }
1793         bool empty() const @property { return _sg.empty; }
1794         void popFront() { _sg.popFront(); }
1795 
1796         bool fetch() { return _sg.fetch(); }
1797 
1798         U[] readOp(alias op, U)(U[] outbuf)
1799         {
1800             if(_buf.length < outbuf.length)
1801                 _buf.length = outbuf.length;
1802 
1803             auto ib = _buf[0 .. outbuf.length];
1804             ib = _sg.read(ib);
1805 
1806             auto p = ib.ptr,
1807                  e = ib[$ .. $].ptr,
1808                  q = outbuf.ptr;
1809 
1810             while(p != e){
1811                 binaryFunExt!op(*q, _arr[*p]);
1812                 () @trusted { ++p; ++q; }();
1813             }
1814 
1815             return outbuf[0 .. ib.length];
1816         }
1817 
1818 
1819         U[] read(U)(U[] buf)
1820         {
1821             return readOp!""(buf);
1822         }
1823 
1824 
1825         //mixin(defaultStreamSIMDOperator);
1826 
1827 
1828       private:
1829         Sg _sg;
1830         const(E)[] _arr;
1831         size_t[] _buf;
1832     }
1833 
1834     return Result(sg, arr, buf);
1835 }