1 // Written in the D programming language.
2 /*
3 NYSL Version 0.9982
4 
5 A. This software is "Everyone'sWare". It means:
6   Anybody who has this software can use it as if he/she is
7   the author.
8 
9   A-1. Freeware. No fee is required.
10   A-2. You can freely redistribute this software.
11   A-3. You can freely modify this software. And the source
12       may be used in any software with no limitation.
13   A-4. When you release a modified version to public, you
14       must publish it with your name.
15 
16 B. The author is not responsible for any kind of damages or loss
17   while using or misusing this software, which is distributed
18   "AS IS". No warranty of any kind is expressed or implied.
19   You use AT YOUR OWN RISK.
20 
21 C. Copyrighted to Kazuki KOMATSU
22 
23 D. Above three clauses are applied both to source and binary
24   form of this software.
25 */
26 
27 /**
28 このモジュールは、信号処理のための試験的なモジュールです。
29 このモジュールで定義されているストリームは、Rangeに拡張を加える事で大量のデータを処理することに最適です。
30 また、状況によってはSIMD最適化を狙えるよう設計になっています。
31 */
32 module carbon.stream;
33 
34 import carbon.math,
35        carbon.functional,
36        carbon.nonametype;
37 
38 import std.algorithm,
39        std.container,
40        std.file,
41        std.functional,
42        std.math,
43        std.range,
44        std.stdio,
45        std.traits;
46 
47 
48 /**
49 入力ストリームは、`read`メソッドに与えられたバッファに要素を格納可能な型です。
50 実行時には、次のような特徴を満たさなければいけません。
51 
52 + `stream.empty`は、そのストリームからノンブロッキングで一要素でも取り出せる場合には`false`となる。
53 + `stream.front`は、そのストリームからノンブロッキングで一要素取り出す。
54 + `stream.popFront`は、そのストリームをノンブロッキングでひとつ進める。
55 + `stream.length`が有効である場合、この値はノンブロッキングで読み出すことができる要素数となる。
56 + `stream.read(buf)`は、ノンブロッキングで処理しなければいけない。
57 + `stream.read(buf)`は、bufのうち先頭から読み込みに成功した要素数だけのスライスを返す。
58     - `stream.length >= buf.length`のとき、常に、`buf.length == stream.read(buf).length`となる。
59 //+ `stream.closed`は、いくら待ってもそれ以上要素が取り出せない状況で`true`となる。
60 //    - `stream.closed`が真の場合、`stream.empty`であり、`lenght`が有効であれば`stream.length == 0`を満たす。
61 + `stream.fetch()`は、`stream.empty`が偽になるまで、
62     もしくはそのストリームからは今後一切データを取り出せないと判断した時点で処理を返す。
63     `stream.fetch()`は、待機後の`stream.empty()`を返す。
64 + `stream`が無限レンジの場合、`stream.read(buf)`は常に`buf`の全要素に読み込む。
65 */
66 enum bool isInputStream(T) = isInputRange!T && is(typeof((T t){
67         alias E = Unqual!(ElementType!T);
68         while(!t.empty || !t.fetch()) E[] var = t.read(new E[1024]);
69     }));
70 
71 
72 /**
73 入力ストリームから、ストリーム終端になるまで可能な限り読み出します。
74 */
75 E[] fillBuffer(S, E)(ref S s, E[] buf)
76 if(isInputStream!S)
77 {
78     E[] rem = buf;
79     while(rem.length && (!s.empty || !s.fetch()))
80         rem = s.read(rem);
81     
82     return buf[0 .. $ - rem.length];
83 }
84 
85 
86 /**
87 演算可能なストリームとは、`readOp`メソッドに与えられたバッファ上に、計算した要素を格納可能な入力ストリームです。
88 これは、次のような実行時条件を満たしている必要があります。
89 
90 + `stream.readOp!"op"(buf)`は、`a op= b`という演算をbuf上で行う。
91     つまり、
92     - op : "+"のとき、streamから読み出された値を、bufに加える演算を表す。
93     - op : "-"のとき、streamから読み出された値を、bufから引く演算を表す。
94     + op : ""のときは、`stream.read(buf)`と等価です。
95 + `stream.readOp!func(buf)`は、`func(a, b)`という演算結果をbufに格納する
96     - `stream.readOp!((a, b) => b)(buf)`は、`stream.read(buf)`と等価です。
97     - `stream.readOp!((a, b) => a + b)(buf)`は、`stream.readOp!"+"(buf)`と等価です。
98 */
99 enum bool isInplaceComputableStream(T, alias op = "", U = Unqual!(ElementType!T)) = isInputStream!T &&
100     is(typeof((T t, U[] buf){
101         buf = t.readOp!op(buf);   // as read(buf)
102     }));
103 
104 
105 /**
106 出力ストリームは、`t.write`メソッドに与えられたバッファを出力可能な型です。
107 実行時には次のような条件を満たしている必要があります。
108 
109 + `stream.write(buf)`は、ノンブロッキングでバッファを出力し、まだ出力できていないバッファを返す。
110 + `stream.fill`が偽であれば、一要素でも`stream.write`でノンブロッキング出力可能である
111 + `stream.flush()`は、ブロッキング処理で`t.fill`が偽になるか、
112     もしくはそれ以上出力不可能だと判断した時点で、`t.fill`を返す。
113 */
114 enum bool isOutputStream(T, E) = 
115     is(typeof((T t, E e){
116         E[] writeData = new E[1024];
117         while(writeData.length && (!t.fill || !t.flush())) writeData = t.write(writeData);
118     }));
119 
120 
121 /**
122 */
123 E[] fillBufferOp(alias op, S, E)(ref S s, E[] buffer)
124 {
125     auto rem = buffer;
126     while(rem.length && (!s.empty || !s.fetch()))
127         rem = s.readOp!op(rem);
128     return buffer[0 .. $ - rem.length];
129 }
130 
131 
132 /**
133 内部にバッファを持つような入力ストリームです。
134 
135 + `stream.availables`は内部のバッファを返す。
136 + `stream.consume(size)`は、内部バッファの先頭から`size`要素だけを捨てる。
137 */
138 enum bool isBufferedInputStream(T) = isInputStream!T && 
139     is(typeof((T t){
140         alias E = ElementType!T;
141         const(E)[] buf = t.availables;
142         t.consume(buf.length);
143     }));
144 
145 
146 /**
147 内部にバッファを持つような出力ストリームです。
148 
149 + `stream.buffer`は内部のバッファを返す。
150 + `stream.flush(size)`は、内部バッファの先頭から`size`要素だけ出力する。
151 */
152 enum bool isBufferedOutputStream(T) = isOutputStream!T &&
153     is(typeof((T t){
154         auto buf = t.buffer;
155         static assert(is(typeof(buf) : E[], E));
156         static assert(is(typeof(buf) == Unqual!(typeof(buf))));
157         t.flush(buf.length);
158     }));
159 
160 
161 /**
162 InplaceComputableStreamを作成する際に有用な、std.functional.binaryFunの拡張です
163 */
164 auto ref A binaryFunExt(alias op, A, B)(auto ref A a, auto ref B b)
165 if(is(typeof(mixin(`a` ~ op ~ `=b`))))
166 {
167     mixin(`a` ~ op ~ "=b;");
168     return a;
169 }
170 
171 
172 /// ditto
173 auto ref A binaryFunExt(alias func, A, B)(auto ref A a, auto ref B b)
174 if(is(typeof(a = naryFun!func(a, forward!b))))
175 {
176     a = naryFun!func(a, forward!b);
177     return a;
178 }
179 
180 
181 /**
182 レンジをisInplaceComputableStreamに変換します
183 */
184 auto toStream(R)(R range)
185 if(isInputRange!R && !isInputStream!R)
186 {
187     alias E = Unqual!(ElementType!R);
188 
189     static struct InputStreamRange()
190     {
191         R range;
192         alias range this;
193         alias closed = range.empty;
194         bool fetch(){ return range.empty; }
195 
196         T[] readOp(alias func, T)(T[] buf)
197         {
198             auto p = buf.ptr;
199             const end = () @trusted { return p + buf.length; }();
200             while(p != end && !range.empty){
201                 binaryFunExt!func(*p, range.front);
202                 range.popFront();
203                 () @trusted { ++p; }();
204             }
205 
206             return () @trusted { return buf[0 .. cast(size_t)(p - buf.ptr)]; }();
207         }
208 
209         T[] read(T)(T[] buf) { return readOp!""(buf); }
210     }
211 
212 
213     return InputStreamRange!()(range);
214 }
215 
216 
217 /**
218 正確なComplexNCO
219 */
220 auto preciseComplexNCO(real freq, real deltaT, real theta = 0) pure nothrow @safe @nogc
221 {
222     static struct PreciseComplexNCO()
223     {
224         creal front() const @property { return this[0]; }
225         void popFront() { _theta += _dt2PI * _freq; }
226         enum bool empty = false;
227         PreciseComplexNCO!() save() const @property { return this; }
228         creal opIndex(size_t i) const @property { return std.math.expi(_theta + i * _dt2PI * _freq); }
229         struct OpDollar{} enum OpDollar opDollar = OpDollar();
230         PreciseComplexNCO!() opSlice() const { return this; }
231         PreciseComplexNCO!() opSlice(size_t i, OpDollar) const { PreciseComplexNCO!() dst = this; dst._theta += i * _dt2PI * _freq; return dst; }
232         auto opSlice(size_t i, size_t j) const { return this[i .. $].take(j - i); }
233 
234         bool fetch() { return false; }
235 
236         T[] readOp(alias op, T)(T[] buf)
237         {
238             auto p = buf.ptr;
239             const end = () @trusted { return p + buf.length; }();
240             while(p != end){
241                 binaryFunExt!op(*p, std.math.expi(_theta));
242                 _theta += _dt2PI * _freq;
243                 () @trusted { ++p; }();
244             }
245 
246             return buf;
247         }
248 
249 
250         Cpx[] read(Cpx)(Cpx[] buf){ return readOp!""(buf); }
251 
252       @property
253       {
254         real freq() const @property { return _freq; }
255         void freq(real f) @property { _freq = f; }
256 
257         real deltaT() const @property { return _dt2PI / 2 / PI; }
258         void deltaT(real dt) @property { _dt2PI = dt * 2 * PI; }
259       }
260 
261       private:
262         real _freq;
263         real _dt2PI;
264         real _theta;
265     }
266 
267     return PreciseComplexNCO!()(freq, deltaT * 2 * PI, theta);
268 }
269 
270 
271 ///
272 unittest
273 {
274     auto sig1 = preciseComplexNCO(1000, 0.001, 0);
275     static assert(isInputStream!(typeof(sig1)));
276     static assert(isInplaceComputableStream!(typeof(sig1)));
277     static assert(is(ElementType!(typeof(sig1)) == creal));
278 
279     assert(equal!((a, b) => approxEqual(a.re, b.re) && approxEqual(a.im, b.im))
280                 (sig1[0 .. 4], cast(creal[])[1, 1, 1, 1]));
281 
282     sig1 = preciseComplexNCO(1, 0.25, 0);
283     assert(equal!((a, b) => approxEqual(a.re, b.re) && approxEqual(a.im, b.im))
284                 (sig1[0 .. 4], cast(creal[])[1, 0+1i, -1, -1i]));
285 
286     sig1 = preciseComplexNCO(1000, 10.0L^^-6, std.math.E);
287     auto buf = sig1[0 .. 1024].array;
288     assert(sig1.readOp!"-"(buf).length == buf.length);
289     assert(equal!"a == 0"(buf, buf));
290 
291     sig1.readOp!"a+b"(buf);
292 }
293 
294 
295 /**
296 Lookup-Table方式の高速な局部発振器を提供します。
297 テンプレートパラメータの`func`には周期2PIの周期関数を与えることが出来ます。
298 たとえば、`std.math.expi`を与えれば複素発振器となり、`std.math.sin`であれば正弦波を出力します。
299 
300 この局部発振器は周波数の動的な制御が可能なので、信号の周波数をフィードバック制御する際に用いることができます。
301 Lookup-Tableは、テンプレートパラメータ毎にプログラムの初期化時に生成されるので、
302 最初の初期化が終われば、実行コストはテーブルの参照のみになり、高速にアクセス可能です。
303 また、テーブル長を伸ばしても初期化コストが増加するだけで、実行コストはあまり大きくならないことも特徴です。
304 */
305 template lutNCO(alias func, size_t divN)
306 if(isPowOf2(divN))
307 {
308     alias E = typeof(func(0.0));
309     immutable(E[]) table;
310 
311     shared static this()
312     {
313         E[divN] t;
314         foreach(i, ref e; t)
315             e = func(i * 2 * PI / divN);
316 
317         table = t.dup;
318     }
319 
320 
321     struct LutNCO()
322     {
323         E front() const @property { return table[cast(ptrdiff_t)(_phase) & (divN - 1)]; }
324         void popFront() { _phase += _freq * _deltaT * divN; _phase %= divN; }
325         enum bool empty = false;
326         LutNCO save() const @property { return this; }
327         E opIndex(size_t i) const { return table[cast(ptrdiff_t)(_phase + i * _freq * _deltaT * divN) & (divN - 1)]; }
328         struct OpDollar{} enum opDollar = OpDollar();
329         LutNCO opSlice() const { return this; }
330         LutNCO opSlice(size_t i, OpDollar) const { auto dst = this[]; dst._phase += i * _freq * _deltaT; return dst; }
331         auto opSlice(size_t i, size_t j) const { return this[i .. $].take(j - i); }
332 
333         real freq() const @property { return _freq; }
334         void freq(real f) @property { _freq = f; }
335         real deltaT() const @property { return _deltaT; }
336         void deltaT(real t) @property { _deltaT = t; }
337 
338         bool fetch() { return false; }
339 
340         T[] readOp(alias op, T)(T[] buf)
341         if(is(typeof(binaryFunExt!op(buf[0], table[0]))))
342         {
343             immutable dph = _freq * _deltaT * divN;
344             auto p = buf.ptr;
345             const end = () @trusted { return p + buf.length; }();
346             while(p != end){
347                 binaryFunExt!op(*p, table[cast(ptrdiff_t)(_phase) & (divN - 1)]);
348                 _phase += dph;
349                 () @trusted { ++p; }();
350             }
351 
352             return buf[0 .. $];
353         }
354 
355 
356         T[] read(T)(T[] buf) @safe { return readOp!""(buf); }
357 
358       private:
359         real _phase;
360         real _freq;
361         real _deltaT;
362     }
363 
364 
365     LutNCO!() lutNCO(real freq, real deltaT, real theta = 0) pure nothrow @safe @nogc
366     {
367         return LutNCO!()(theta, freq, deltaT);
368     }
369 }
370 
371 
372 ///
373 unittest{
374     auto sig1 = lutNCO!(std.math.expi, 4)(1, 0.25, 0);
375     static assert(isInputStream!(typeof(sig1)));
376     static assert(isInplaceComputableStream!(typeof(sig1)));
377     static assert(is(ElementType!(typeof(sig1)) == creal));
378 
379     assert(equal!((a, b) => approxEqual(a.re, b.re) && approxEqual(a.im, b.im))
380         (sig1[0 .. 1024], preciseComplexNCO(1, 0.25, 0)[0 .. 1024]));
381 
382     sig1 = lutNCO!(std.math.expi, 4)(1000, 10.0L^^-6, std.math.E);
383     auto buf = sig1[0 .. 1024].array;
384     assert(sig1.readOp!"-"(buf).length == buf.length);
385     assert(equal!"a == 0"(buf, buf));
386 }
387 
388 
389 /**
390 永遠とその配列をループし続けるストリーム
391 */
392 auto repeatStream(E)(const E[] array)
393 in{
394     assert(array.length);
395 }
396 body{
397     static struct RepeatStream()
398     {
399         const(E) front() const @property { return _arr[_pos]; }
400         enum empty = false;
401         void popFront() { ++_pos; _pos %= _arr.length; }
402         RepeatStream!() save() const @property { return this; }
403         const(E) opIndex(size_t i) const { return _arr[_pos % $]; }
404         RepeatStream!() opSlice(){ return this; }
405         struct OpDollar{} enum opDollar = OpDollar();
406         RepeatStream!() opSlice(size_t i, OpDollar){ typeof(return) dst; dst._pos = (_pos + i) % _arr.length; return dst; }
407         auto opSlice(size_t i, size_t j){ return this[i .. $].take(j - i); }
408 
409         bool fetch() { return false; }
410 
411         U[] readOp(alias op, U)(U[] buf)
412         {
413             auto rem = buf;
414             while(rem.length){
415                 auto minL = min(rem.length, _arr.length - _pos);
416 
417               static if(is(typeof({ mixin(`rem[]` ~ op ~ `=_arr[];`); })))
418                 mixin(`rem[0 .. minL] `~ op ~ "= _arr[_pos .. _pos + minL];");
419               else{
420                 auto rem_ptr = rem.ptr,
421                      buf_ptr = () @trusted { return _arr.ptr + _pos; }();
422                 const rem_end = () @trusted { return rem_ptr + minL; }();
423 
424                 while(rem_ptr != rem_end){
425                     binaryFunExt!op(*rem_ptr, *buf_ptr);
426                     () @trusted { ++rem_ptr; ++buf_ptr; }();
427                 }
428               }
429               _pos += minL;
430               _pos %= _arr.length;
431               rem = rem[minL .. $];
432             }
433 
434             return buf;
435         }
436 
437 
438         U[] read(U)(U[] buf) { return readOp!""(buf); }
439 
440 
441         const(E)[] availables() @property { return _arr[_pos .. $]; }
442         void consume(size_t n) {
443             _pos += n;
444             _pos %= _arr.length;
445         }
446 
447       private:
448         const(E)[] _arr;
449         size_t _pos;
450     }
451 
452 
453     return RepeatStream!()(array, 0);
454 }
455 
456 
457 /// ditto
458 auto repeatStream(E)(E element)
459 if(!is(E : T[], T))
460 {
461     static struct RepeatStream()
462     {
463         enum bool empty = false;
464         auto front() const @property { return _e; }
465         void popFront() {}
466         auto save() @property { return this; }
467         auto save() const @property { return .repeatStream(_e); }
468         auto save() immutable @property { return .repeatStream(_e); }
469         auto opIndex(size_t) const @property { return _e; }
470         RepeatStream!() opSlice() const { return this; }
471         struct OpDollar{} enum opDollar = OpDollar();
472         RepeatStream!() opSlice(size_t, OpDollar) const { return this; }
473         auto opSlice(size_t i , size_t j) const { return this.take(j - i); }
474 
475         bool fetch() { return false; }
476 
477 
478         U[] readOp(alias op, U)(U[] buf)
479         {
480           static if(is(typeof({ mixin(`buf[] ` ~ op ~ "= _e;"); })))
481             mixin(`buf[] ` ~ op ~ "= _e;");
482           else
483           {
484             auto p = buf.ptr,
485                  e = () @trusted { return p + buf.length; }();
486 
487             while(p != e){
488                 binaryFunExt!op(*p, _e);
489                 () @trusted { ++p; }();
490             }
491           }
492 
493             return buf;
494         }
495 
496 
497         U[] read(U)(U[] buf)
498         {
499             return readOp!""(buf);
500         }
501 
502       private:
503         E _e;
504     }
505 
506 
507     return RepeatStream!()(element);
508 }
509 
510 ///
511 unittest{
512     int[] arr = [0, 1, 2, 3, 4, 5, 6, 7];
513     auto rs = arr.repeatStream;
514     static assert(isInfinite!(typeof(rs)));
515     static assert(isInputStream!(typeof(rs)));
516     static assert(isInplaceComputableStream!(typeof(rs)));
517     static assert(isBufferedInputStream!(typeof(rs)));
518 
519 
520     int[] buf1 = new int[24];
521     assert(rs.readOp!"+"(buf1) == arr ~ arr ~ arr);
522 
523     short[] buf2 = new short[17];
524     assert(rs.readOp!"cast(short)b"(buf2) == buf1[0 .. 17]);
525 }
526 
527 ///
528 unittest{
529     auto rs = 1.repeatStream;
530     static assert(isInfinite!(typeof(rs)));
531     static assert(isInputStream!(typeof(rs)));
532     static assert(isInplaceComputableStream!(typeof(rs)));
533 
534     assert(rs.read(new int[4]) == [1, 1, 1, 1]);
535 }
536 
537 
538 /**
539 一度に巨大なファイルを読み込むことに特化したバッファ持ち入力ストリームです。
540 */
541 auto rawFileStream(T)(string filename, size_t bufferSize = 1024 * 1024)
542 if(is(Unqual!T == T))
543 {
544     static struct RawFileStream()
545     {
546         private void refill()
547         {
548             _pos = 0;
549             if(!_file.isOpen){
550                 _buffer = null;
551                 return;
552             }
553 
554             _pos = 0;
555             immutable beforeSize = _buffer.length;
556             _buffer = _file.rawRead(_buffer);
557             if(_buffer.length != beforeSize)
558                 _file.detach();
559         }
560 
561 
562         /** 入力レンジのプリミティブ
563         */
564         T front() const @property { return _buffer[_pos]; }
565         bool empty() const @property { return _buffer.length == _pos; }     /// ditto
566         void popFront() { ++_pos; }                                         /// ditto
567         size_t length() const @property { return _buffer.length - _pos; }   /// ditto
568 
569 
570         /**
571         入力ストリームのプリミティブ
572         */
573         bool fetch()
574         {
575             if(!this.empty) return false;
576             refill();
577             return this.empty;
578         }
579 
580 
581         /// ditto
582         E[] readOp(alias func, E)(E[] buf)
583         if(is(typeof(binaryFunExt!func(buf[0], _buffer[0]))))
584         {
585             immutable minL = min(buf.length, _buffer.length - _pos);
586           static if(is(typeof(mixin(`buf[]` ~ func ~ `= _buffer[]`))))
587             mixin(`buf[0 .. minL] ` ~ func ~ "= _buffer[_pos .. _pos + minL];");
588           else
589           {
590             auto rem_ptr = buf.ptr,
591                  buf_ptr = () @trusted { return _buffer.ptr + _pos; }();
592             const rem_end = () @trusted { return rem_ptr + minL; }();
593 
594             while(rem_ptr != rem_end){
595                 binaryFunExt!func(*rem_ptr, *buf_ptr);
596                 () @trusted { ++rem_ptr; ++buf_ptr; }();
597             }
598           }
599             _pos += minL;
600             return buf[0 .. minL];
601         }
602 
603 
604         /// ditto
605         E[] read(E)(E[] buf)
606         if(isAssignable!(E, T))
607         { return readOp!""(buf); }
608 
609 
610         /**
611         バッファ持ち入力レンジのプリミティブ
612         */
613         const(T)[] availables() const @property { return _buffer[_pos .. $]; }
614 
615 
616         /**
617         バッファ持ち入力レンジのプリミティブ
618         */
619         void consume(size_t n)
620         {
621             while(n && !this.empty){
622                 if(n >= _buffer.length - _pos){
623                     n -= _buffer.length - _pos;
624                     if(_file.isOpen)
625                         refill();
626                     else{
627                         _buffer = null;
628                         _pos = 0;
629                     }
630                 }
631                 else{
632                     _pos += n;
633                     n = 0;
634                 }
635             }
636         }
637 
638 
639       private:
640         T[] _buffer;
641         size_t _pos;
642         File _file;
643     }
644 
645 
646     auto dst = RawFileStream!()(new T[bufferSize], 0,File(filename));
647     dst.refill();
648     return dst;
649 }
650 
651 ///
652 unittest{
653     immutable fname = "testData.dat";
654     std.file.write(fname, cast(ubyte[])[0, 1, 2, 3, 4, 5]); // 6byte
655     scope(exit)
656         std.file.remove(fname);
657 
658     auto sig1 = rawFileStream!int(fname);
659     static assert(isInputStream!(typeof(sig1)));
660     static assert(isInplaceComputableStream!(typeof(sig1)));
661     static assert(isBufferedInputStream!(typeof(sig1)));
662 
663     assert(sig1.availables.length == 1);
664     sig1.consume(1);
665     assert(sig1.empty && sig1.fetch());
666 }
667 
668 ///
669 unittest{
670     immutable fname = "testData.dat";
671     std.file.write(fname, [0, 1, 2, 3, 4, 5]); // 24byte
672     scope(exit)
673         std.file.remove(fname);
674 
675     auto sig1 = rawFileStream!int(fname);
676     assert(sig1.availables == [0, 1, 2, 3, 4, 5]);
677     sig1.consume(6);
678     assert(sig1.empty && sig1.fetch());
679 }
680 
681 
682 /**
683 二つの信号の積を返します。
684 2つ目の信号は演算による理想信号でなければいけません。
685 */
686 auto mixer(Sg1, Sg2)(Sg1 sg1, Sg2 sg2)
687 if(isInputStream!Sg1 && isInplaceComputableStream!(Sg2, "*") && isInfinite!Sg2)
688 {
689     static struct Mixer()
690     {
691         auto ref front() const @property { return _sg1.front * _sg2.front; }
692 
693       static if(isInfinite!Sg1)
694         enum bool empty = false;
695       else
696         bool empty() const @property { return _sg1.empty; }
697 
698         void popFront() { _sg1.popFront(); _sg2.popFront(); }
699 
700         bool fetch()
701         {
702           static if(isInfinite!Sg1)
703             return false;
704           else
705           {
706             if(!_sg1.empty) return false;
707             //_sg2.fetch();
708             return _sg1.fetch();
709           }
710         }
711 
712         E[] read(E)(E[] buf)
713         {
714             return readOp!""(buf);
715         }
716 
717 
718         E[] readOp(alias op, E)(E[] buf)
719         if((op == "*" || op == "/")
720         && isInplaceComputableStream!(Sg1, op)
721         && isInplaceComputableStream!(Sg2, op))
722         {
723             return _sg2.readOp!op(_sg1.readOp!op(buf));
724         }
725 
726 
727         E[] readOp(alias op : "", E)(E[] buf)
728         {
729             return _sg2.readOp!"*"(_sg1.read(buf));
730         }
731 
732 
733       private:
734         Sg1 _sg1;
735         Sg2 _sg2;
736     }
737 
738 
739     return Mixer!()(sg1, sg2);
740 }
741 
742 
743 ///
744 unittest
745 {
746     auto arr1 = [0, 1, 0, 1, 0, 1, 0, 1].repeatStream;
747     auto arr2 = [0, 0, 1, 1, 0, 0, 1, 1].repeatStream;
748     auto mx1 = arr1.mixer(arr2);
749     static assert(isInfinite!(typeof(mx1)));
750     static assert(isInputStream!(typeof(mx1)));
751 
752     int[] buf1 = new int[16];
753     assert(mx1.read(buf1) == [0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1]);
754 
755     auto arr3 = [0, 0, 0, 0, 1, 1, 1, 1].repeatStream;
756     auto mx2 = mx1.mixer(arr3);
757 
758     int[] buf2 = new int[16];
759     assert(mx2.read(buf2) == [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1]);
760 }
761 
762 
763 /**
764 二つの信号を足します。
765 2つ目の信号は、演算により生成された理想信号でなければいけません。
766 */
767 auto adder(Sg1, Sg2)(Sg1 sg1, Sg2 sg2)
768 if(isInputStream!Sg1 && isInplaceComputableStream!(Sg2, "+") && isInfinite!Sg2)
769 {
770     static struct Adder()
771     {
772         auto ref front() const @property { return _sg1.front + _sg2.front; }
773 
774       static if(isInfinite!Sg1)
775         enum bool empty = false;
776       else
777         bool empty() const @property { return _sg1.empty; }
778 
779         void popFront() { _sg1.popFront(); _sg2.popFront(); }
780 
781         bool fetch() {
782           static if(isInfinite!Sg1)
783             return false;
784           else
785           {
786             if(!_sg1.empty) return false;
787             //_sg2.fetch();
788             return _sg1.fetch();
789           }
790         }
791 
792         E[] read(E)(E[] buf)
793         {
794             return readOp!""(buf);
795         }
796 
797 
798         E[] readOp(alias op, E)(E[] buf)
799         if((op == "+" || op == "-")
800         && isInplaceComputableStream!(Sg1, op)
801         && isInplaceComputableStream!(Sg2, op))
802         {
803             return _sg2.readOp!op(_sg1.readOp!op(buf));
804         }
805 
806 
807         E[] readOp(alias op : "", E)(E[] buf)
808         {
809             return _sg2.readOp!"+"(_sg1.read(buf));
810         }
811 
812 
813       private:
814         Sg1 _sg1;
815         Sg2 _sg2;
816     }
817 
818 
819     return Adder!()(sg1, sg2);
820 }
821 
822 
823 ///
824 unittest
825 {
826     auto arr1 = [0, 1, 0, 1, 0, 1, 0, 1].repeatStream;
827     auto arr2 = [0, 0, 1, 1, 0, 0, 1, 1].repeatStream;
828     auto mx1 = arr1.adder(arr2);
829     static assert(isInfinite!(typeof(mx1)));
830     static assert(isInputStream!(typeof(mx1)));
831 
832     int[] buf1 = new int[16];
833     assert(mx1.read(buf1) == [0, 1, 1, 2, 0, 1, 1, 2, 0, 1, 1, 2, 0, 1, 1, 2]);
834 }
835 
836 ///
837 unittest
838 {
839     auto arr1 = [0, 1, 0, 1, 0, 1, 0, 1].repeatStream;
840     auto buf1 = arr1.adder(arr1).adder(arr1).read(new int[16]);
841     auto buf2 = arr1.amplifier(3).read(new int[16]);
842 
843     assert(buf1 == buf2);
844 }
845 
846 
847 /**
848 積分器です。
849 積分するサンプル量が大きい場合の使用に適しています。
850 */
851 auto accumulator(E)(size_t integN)
852 {
853     static struct Accumulator
854     {
855         E[] write(E[] buf)
856         {
857             while(buf.length){
858                 immutable n = min(_integN - _cnt, buf.length);
859                 auto ptr = buf.ptr;
860                 const end = buf.ptr + n;
861                 auto val = _buffer.back;
862                 while(ptr != end){
863                     val += *ptr;
864                     ++ptr;
865                 }
866 
867                 _cnt += n;
868                 _buffer.back = val;
869                 buf = buf[n .. $];
870                 if(_cnt == _integN){
871                     _buffer ~= cast(E)0;
872                     _cnt = 0;
873                     ++_avaN;
874                 }
875             }
876 
877             return buf;
878         }
879 
880 
881         enum bool fill = false;
882         bool flush() { return false; }
883 
884 
885         auto opSlice(){ return _buffer[].take(_avaN); }
886 
887         E front() const @property { return _buffer.front; }
888         void popFront() { _buffer.removeFront(); --_avaN; }
889         bool empty() const @property { return _avaN == 0; }
890 
891 
892       private:
893         DList!E _buffer;
894         size_t _cnt;
895         size_t _integN;
896         size_t _avaN;
897     }
898 
899 
900     return Accumulator(DList!E(cast(E)0), 0, integN, 0);
901 }
902 
903 ///
904 unittest
905 {
906     auto acc = accumulator!int(4);
907     static assert(isOutputStream!(typeof(acc), int));
908 
909     assert(acc.write([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]).empty);
910     assert(equal(acc[], [0+1+2+3, 4+5+6+7]));
911 
912     acc.write([10, 11]);
913     assert(equal(acc[], [0+1+2+3, 4+5+6+7, 8+9+10+11]));
914 }
915 
916 
917 /**
918 積分器です。
919 たとえば、連続する少数の個数のサンプルの和を取るような用途に適しています。
920 */
921 auto accumulator(size_t N, Sg)(Sg sg, size_t bufSize = 1024 * 1024)
922 {
923     alias E = Unqual!(ElementType!Sg);
924     return accumulator(sg, new E[bufSize]);
925 }
926 
927 
928 /// ditto
929 auto accumulator(size_t N, Sg, E)(Sg sg, E[] buffer)
930 if(isInputStream!Sg && is(ElementType!Sg : E))
931 {
932     static struct Accumulator()
933     {
934         auto front() const @property { return availables[0]; }
935         void popFront() { consume(1); }
936         bool empty() const @property { return _pos == _end; }
937 
938         bool fetch()
939         {
940             if(!this.empty) return false;
941             auto p = _buffer.ptr + _buffer.length - _remN,
942                  f = _buffer.ptr;
943             const e = _buffer.ptr + _buffer.length;
944             while(p != e) { *f = *p; ++f; ++p; }
945             _pos = 0;
946             _end = 0;
947 
948             while(this.empty && (!_sg.empty || !_sg.fetch())){
949                 auto buf = _sg.read(_buffer[_remN .. $]);
950                 immutable size_t len = (_remN + buf.length) / N;
951                 _end += len;
952 
953                 auto pp = _buffer.ptr,
954                      ff = _buffer.ptr;
955                 auto ee = _buffer.ptr + _end;
956 
957                 while(pp != ee){
958                     size_t s = 1;
959                     if(pp != _buffer.ptr) { *pp = 0; s = 0; }
960 
961                     ff += s;
962                     foreach(i; s .. N){
963                         *pp += *ff;
964                         ++ff;
965                     }
966 
967                     ++pp;
968                 }
969                 _remN = (_remN + buf.length) - _end * N;
970             }
971 
972             return this.empty;
973         }
974 
975 
976         T[] readOp(alias func, T)(T[] buf)
977         {
978             immutable minL = min(buf.length, _end - _pos);
979           static if(is(typeof(mixin(`buf[]` ~ func ~ `= _buffer[]`))))
980             mixin(`buf[0 .. minL] ` ~ func ~ "= _buffer[_pos .. _pos + minL];");
981           else
982           {
983             auto rem_ptr = buf.ptr,
984                  buf_ptr = () @trusted { return _buffer.ptr + _pos; }();
985             const rem_end = () @trusted { return rem_ptr + minL; }();
986 
987             while(rem_ptr != rem_end){
988                 binaryFunExt!func(*rem_ptr, *buf_ptr);
989                 () @trusted { ++rem_ptr; ++buf_ptr; }();
990             }
991           }
992             _pos += minL;
993             return buf[0 .. minL];
994         }
995 
996 
997         T[] read(T)(T[] buf) { return readOp!""(buf); }
998 
999 
1000         const(E)[] availables() const @property { return _buffer[_pos .. _end]; }
1001         void consume(size_t n)
1002         {
1003             while(n && (!this.empty || !this.fetch())){
1004                 if(n <= _end - _pos){
1005                     _pos += n;
1006                     n = 0;
1007                 }else{
1008                     n -= _end - _pos;
1009                     _pos = _end;
1010                 }
1011             }
1012         }
1013 
1014 
1015       private:
1016         Sg _sg;
1017         E[] _buffer;
1018         size_t _pos;
1019         size_t _end;
1020         size_t _remN;
1021     }
1022 
1023 
1024     return Accumulator!()(sg, buffer, 0, 0, 0);
1025 }
1026 
1027 ///
1028 unittest
1029 {
1030     auto arr1 = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9].repeatStream;
1031     auto acc = accumulator!4(arr1, new int[14]);
1032     static assert(isBufferedInputStream!(typeof(acc)));
1033 
1034     int[] buf = new int[3];
1035     assert(!acc.empty || !acc.fetch());
1036     assert(acc.read(buf) == [0+1+2+3, 4+5+6+7, 8+9+0+1]);
1037     assert(!acc.empty || !acc.fetch());
1038     assert(acc.read(buf) == [2+3+4+5, 6+7+8+9, 0+1+2+3]);
1039 }
1040 
1041 /+ ベンチマーク, rangeの3倍程度の性能
1042 unittest {
1043     static real[] vector;
1044     vector = new real[1024 * 1024];
1045     foreach(ref e; vector) e = 0;
1046 
1047 
1048     void funcStream()
1049     {
1050         alias sin128LutNCO = lutNCO!(std.math.sin, 128);
1051 
1052         auto mx = mixer(sin128LutNCO(6.5E6L, 1E-6L, 0),
1053                         sin128LutNCO(6.6E6L, 1E-6L, 0));
1054 
1055         mx.readOp!"*"(vector);
1056     }
1057 
1058 
1059     void funcRange()
1060     {
1061         alias sin128LutNCO = lutNCO!(std.math.sin, 128);
1062 
1063         auto mx = zip(sin128LutNCO(6.5E6L, 1E-6L, 0), sin128LutNCO(6.6E6L, 1E-6L, 0))
1064                  .map!"a[0]*a[1]";
1065 
1066         auto ptr = vector.ptr, end = ptr + vector.length;
1067         while(ptr != end){
1068             *ptr *= mx.front;
1069             mx.popFront();
1070             ++ptr;
1071         }
1072     }
1073 
1074     
1075     import std.datetime;
1076     import std.container;
1077     auto ts = benchmark!(funcStream, funcRange)(26);
1078 
1079     writefln("%(\n%s%)", ts[].map!"a.usecs");
1080 }
1081 +/
1082 
1083 
1084 auto amplifier(Sg, F)(Sg sg, F gain)
1085 {
1086     static struct Amplifier()
1087     {
1088         auto front() const @property { return _sg.front * _gain; }
1089         void popFront() { _sg.popFront(); }
1090         bool empty() const { return _sg.empty; }
1091         bool fetch() { return _sg.fetch(); }
1092 
1093 
1094         E[] readOp(string op, E)(E[] buf)
1095         if((op == "*" || op == "/") && isInplaceComputableStream!(Sg, op, E))
1096         {
1097             buf = _sg.readOp!op(buf);
1098             mixin("buf[]" ~ op ~ "= _gain;");
1099 
1100             return buf;
1101         }
1102 
1103 
1104         E[] read(E)(E[] buf)
1105         {
1106             buf = _sg.read(buf);
1107             buf[] *= _gain;
1108 
1109             return buf;
1110         }
1111 
1112 
1113         F gain() const @property pure nothrow @safe @nogc { return _gain; }
1114         void gain(F g) @property pure nothrow @safe @nogc { _gain = g; }
1115 
1116 
1117       private:
1118         Sg _sg;
1119         F _gain;
1120     }
1121 
1122 
1123     return Amplifier!()(sg, gain);
1124 }
1125 
1126 ///
1127 unittest
1128 {
1129     auto arr4 = [0, 1, 2, 3].repeatStream,
1130          amped = arr4.amplifier(4);
1131 
1132     assert(amped.read(new int[4]) == [0, 4, 8, 12]);
1133     static assert(isInplaceComputableStream!(typeof(arr4)));
1134     static assert(isInplaceComputableStream!(typeof(amped), "*"));
1135     static assert(isInplaceComputableStream!(typeof(amped), "/"));
1136 
1137     auto arr = [3, 4, 1, 2];
1138     assert(amped.readOp!"*"(arr) == [0, 16, 8, 24]);
1139 }
1140 
1141 
1142 auto selector(Sg...)(Sg sgs)
1143 {
1144     static struct Selector()
1145     {
1146         import std.string : format;
1147 
1148 
1149         auto front() const @property
1150         {
1151             switch(_selectIndex){
1152               foreach(i, E; Sg)
1153                 mixin(format("case %s: return _sgs[%s].front;", i, i));
1154               default: assert(0);
1155             }
1156         }
1157 
1158 
1159         void popFront()
1160         {
1161             switch(_selectIndex){
1162               foreach(i, E; Sg)
1163                 mixin(format("case %s: _sgs[%s].popFront();", i, i));
1164               default: assert(0);
1165             }
1166         }
1167 
1168 
1169         bool empty() const @property
1170         {
1171             switch(_selectIndex){
1172               foreach(i, E; Sg)
1173                 mixin(format("case %s: return _sgs[%s].empty;", i, i));
1174               default: assert(0);
1175             }
1176         }
1177 
1178 
1179         bool fetch()
1180         {
1181             switch(_selectIndex){
1182               foreach(i, E; Sg)
1183                 mixin(format("case %s: return _sgs[%s].fetch();", i, i));
1184               default: assert(0);
1185             }
1186         }
1187 
1188 
1189         E[] read(E)(E[] buf)
1190         {
1191             switch(_selectIndex){
1192               foreach(i, E; Sg)
1193                 mixin(format("case %s: return _sgs[%s].read(buf);", i, i));
1194               default: assert(0);
1195             }
1196         }
1197 
1198 
1199         E[] readOp(alias op, E)(E[] buf)
1200         {
1201             switch(_selectIndex){
1202               foreach(i, E; Sg)
1203                 mixin(format("case %s: return _sgs[%s].readOp!op(buf);", i, i));
1204               default: assert(0);
1205             }
1206         }
1207 
1208 
1209         void select(size_t i)
1210         in{
1211             assert(i < Sg.length);
1212         }
1213         body{
1214             _selectIndex = i;
1215         }
1216 
1217 
1218         void select(size_t i)()
1219         {
1220             static assert(i < Sg.length);
1221             _selectIndex = i;
1222         }
1223 
1224 
1225       private:
1226         Sg _sgs;
1227         size_t _selectIndex;
1228     }
1229 
1230 
1231     return Selector!()(sgs, 0);
1232 }
1233 
1234 ///
1235 unittest
1236 {
1237     auto arr1 = 0.repeatStream,
1238          arr2 = [0, 1].repeatStream,
1239          arr3 = [0, 1, 2].repeatStream,
1240          arr4 = [0, 1, 2, 3].repeatStream;
1241 
1242     auto slt = selector(arr1, arr2, arr3, arr4);
1243     assert(slt.read(new int[4]) == [0, 0, 0, 0]);
1244     slt.select(1); // or slt.select!1;
1245     assert(slt.read(new int[4]) == [0, 1, 0, 1]);
1246     slt.select(2); // or slt.select!2;
1247     assert(slt.read(new int[4]) == [0, 1, 2, 0]);
1248     slt.select(3); // or slt.select!3;
1249     assert(slt.read(new int[4]) == [0, 1, 2, 3]);
1250 }
1251 
1252 
1253 /**
1254 FIRフィルタを構成します。
1255 
1256 FIRフィルタの一般形は、z変換すれば次のような式で表すことができます。
1257 H[z] = Σ(k[m]*z^(-m))
1258 
1259 この関数には、各タップの係数`k[m]`を指定することで任意のFIRフィルタを構築することができます。
1260 */
1261 auto firFilter(alias reduceFn = "a+b*c", Sg, E)(Sg sg, const E[] taps)
1262 if(isInputStream!Sg)
1263 {
1264     return firFilter!(reduceFn, Sg, Unqual!E)(sg, taps, new Unqual!E[](taps.length));
1265 }
1266 
1267 
1268 ///
1269 auto firFilter(alias reduceFn = "a+b*c", Sg, E)(Sg sg, const E[] tap, E[] buf)
1270 if(isInputStream!Sg && is(E == Unqual!E))
1271 in{
1272     assert(tap.length == buf.length);
1273 }
1274 body{
1275     static struct FIRFiltered()
1276     {
1277         E front() const @property @trusted
1278         {
1279             E a = 0;
1280 
1281             {
1282                 auto p1 = _tap[$ - 1 .. $].ptr,
1283                      p2 = _buf[_idx .. $].ptr,
1284                      e2 = _buf[$ .. $].ptr;
1285 
1286                 while(p2 != e2){
1287                     a = naryFun!reduceFn(a, *p2, *p1);
1288                     --p1;
1289                     ++p2;
1290                 }
1291             }
1292 
1293             {
1294                 auto p1 = _tap.ptr + _idx-1,
1295                      p2 = _buf[0 .. _idx].ptr,
1296                      e2 = _buf[_idx .. $].ptr;
1297 
1298                 while(p2 != e2){
1299                     a = naryFun!reduceFn(a, *p2, *p1);
1300                     --p1;
1301                     ++p2;
1302                 }
1303             }
1304 
1305             return a;
1306         }
1307 
1308 
1309         void popFront()
1310         {
1311             _buf[_idx] = _sg.front;
1312             ++_idx;
1313             _idx %= _buf.length;
1314             _sg.popFront();
1315         }
1316 
1317       static if(isInfinite!Sg)
1318         enum bool empty = false;
1319       else
1320         bool empty() const @property { return _sg.empty; }
1321 
1322 
1323         bool fetch() { return _sg.fetch(); }
1324 
1325 
1326         U[] readOp(alias op, U)(U[] buf)
1327         {
1328             auto p = buf.ptr,
1329                  e = buf[$ .. $].ptr;
1330 
1331             while(p != e && !this.empty){
1332                 binaryFunExt!op(*p, this.front);
1333                 this.popFront();
1334                 () @trusted { ++p; }();
1335             }
1336 
1337             return () @trusted { return buf[0 .. p - buf.ptr]; }();
1338         }
1339 
1340 
1341         U[] read(U)(U[] buf)
1342         {
1343             return readOp!""(buf);
1344         }
1345 
1346 
1347       private:
1348         Sg _sg;
1349         const(E)[] _tap;
1350         E[] _buf;
1351         size_t _idx;
1352     }
1353 
1354 
1355     return FIRFiltered!()(sg, tap, buf, 0);
1356 }
1357 
1358 ///
1359 unittest
1360 {
1361     auto arr = [0, 1, 2, 3].repeatStream,
1362          flt1 = arr.firFilter([0, 1]);
1363 
1364     assert(flt1.read(new int[8]) == [0, 0, 0*1+1*0, 1, 2, 3, 0, 1]);
1365 
1366     auto flt2 = arr.firFilter([1, 2]);
1367     assert(flt2.read(new int[6]) == [0, 0, 0*2+1*1, 1*2+2*1, 2*2+3*1, 3*2+0*1]);
1368 }
1369 
1370 
1371 /*
1372 IIRフィルタを構成します。
1373 
1374 IIRフィルタの一般形は、z変換すれば次のような式で表すことができます。
1375 H(z) = 1/Σ(k[m]*z^(-m))
1376 
1377 この関数には、各タップの係数`k[m]`を指定することで任意のIIRフィルタを構築することができます。
1378 */
1379 //auto iirFilter(alias reduceFn = "a+b*c", Sg, E)(Sg sg, const E[] taps)
1380 //if(isInputStream!Sg)
1381 //{
1382 //    return iirFilter!(reduceFn, Sg, Unqual!E)(sg, taps, new Unqual!E()(taps.length));
1383 //}
1384 
1385 /////
1386 //auto iirFilter(alias reduceFn = "a+b*c", Sg, E)(Sg sg, const E[] taps, E[] buf)
1387 //if(isInputStream!Sg && is(E == Unqual!E))
1388 //{
1389 
1390 //}
1391 
1392 
1393 /**
1394 信号の絶対値の最大値を`limit`にするように線形に振幅を小さく、もしくは大きくします。
1395 */
1396 auto normalizer(Sg, E)(Sg sg, E limit)
1397 if(isFloatingPoint!E)
1398 {
1399     static struct Normalizer()
1400     {
1401         E front() @property
1402         {
1403             auto f = _sg.front,
1404                  a = abs(f);
1405 
1406             if(f == 0) return 0;
1407 
1408             if(a > _max){
1409                 _max = a;
1410 
1411                 return _lim;
1412             }
1413 
1414             return f / _max * _lim;
1415         }
1416 
1417 
1418         auto empty() const @property { return _sg.empty; }
1419         void popFront() { _sg.popFront(); }
1420 
1421         bool fetch() { return _sg.fetch(); }
1422 
1423 
1424         U[] read(U)(U[] buf)
1425         {
1426             buf = _sg.read(buf);
1427 
1428             auto p = buf.ptr,
1429                  e = buf[$ .. $].ptr;
1430 
1431             while(p != e){
1432                 immutable v = abs(*p);
1433                 if(v == 0)
1434                     *p = 0;
1435                 else{
1436                     if(v > _max)
1437                         _max = v;
1438 
1439                     *p *= _lim / _max;
1440                 }
1441 
1442                 () @trusted { ++p; }();
1443             }
1444 
1445             return buf;
1446         }
1447 
1448 
1449         U[] readOp(alias op : "", U)(U[] buf)
1450         {
1451             return this.read(buf);
1452         }
1453 
1454 
1455         E[] readOp(alias op, E)(E[] buf)
1456         {
1457             auto p = buf.ptr,
1458                  e = buf[$ .. $].ptr;
1459 
1460             while(p != e && !this.empty){
1461                 binaryFunExt!op(*p, this.front);
1462                 this.popFront();
1463                 () @trusted { ++p; }();
1464             }
1465 
1466             return buf;
1467         }
1468 
1469 
1470       private:
1471         Sg _sg;
1472         E _lim;
1473         E _max;
1474     }
1475 
1476     return Normalizer!()(sg, limit, 0);
1477 }
1478 
1479 ///
1480 unittest
1481 {
1482     auto arr = [0, 1, -2, 3].repeatStream,
1483          nlz = arr.normalizer(1.5);
1484 
1485     assert(equal!approxEqual(nlz.read(new float[8]), [0, 1.5, -1.5, 1.5, 0, 0.5, -1.0, 1.5]));
1486 }