c# - async Throttling using AsyncCollection or BufferBlock from the TPL in .net -
i read stream , buffer it's output consumer read before producer has finish full reading of stream. example, read http stream , forward ... using .net4.5 tpl library. found out nito asynccollection , below wrote. wondering if correct because when debug, using long string test , when debuging asp.net vnext pipeline, readfrom , write in sequence... how sure has correct behaviour in non debug mode ?
using nito.asyncex; using system; using system.collections.concurrent; using system.io; using system.threading.tasks; using system.threading.tasks.dataflow; namespace absyla.core { public class pipestream { private asynccollection<byte[]> _chunks = new asynccollection<byte[]>(1); public async task readfrom(stream s) { try { byte[] buffer; int bytesread = 0; int totalbytesread = 0; { long buffsize = math.min(s.length - totalbytesread, 1l); buffer = new byte[buffsize]; bytesread = await s.readasync(buffer, 0, buffer.length); if (bytesread > 0) { int readid = system.environment.currentmanagedthreadid; await _chunks.addasync(buffer); totalbytesread += bytesread; } } while (bytesread > 0); } { _chunks.completeadding(); } } public async task writeto(stream s) { while (await _chunks.outputavailableasync()) { int writeid = system.environment.currentmanagedthreadid; byte[] buffer = await _chunks.takeasync(); await s.writeasync(buffer, 0, buffer.length); } } } }
here test
[fact] public async task shouldbufferlittlestring() { string teststring = "test in"; pipestream ps = new pipestream(); memorystream ms2 = new memorystream(); await ps.readfrom(teststring.asstream()); await ps.writeto(ms2); string result = ms2.readfromstart(); result.shouldbe(teststring); }
note : asstream() method extension returns stream object string.
as found simple, wonder if correct , if job want...
update : changing following:
private asynccollection<byte[]> _chunks = new asynccollection<byte[]>();
to
private asynccollection<byte[]> _chunks = new asynccollection<byte[]>(2);
made throttling work. test blocks. , work when this:
public async task shouldbufferlittlestring() { string teststring = "test in"; pipestream ps = new pipestream(); memorystream ms2 = new memorystream(); var readtask = ps.readfrom(teststring.asstream()); var writetask = ps.writeto(ms2); await task.whenall(readtask, writetask); string result = ms2.readfromstart(); result.shouldbe(teststring); }
but in code cannot have reference read , write task can called different location in code.
i tried change bufferblock, had same issue.
update 2 : changed pipestream code little add throtling , buffer of 1 test.
in test, when await blocks, when create task , await them after, test passes.
// blocks: await ps.readfrom(teststring.asstream()); await ps.writeto(ms2); //this passes var read = ps.readfrom(teststring.asstream()); var write = ps.writeto(ms2); await read; await write;
Comments
Post a Comment