How to use the @closure decorator for stream processing
TinyChain lets developers write code that mostly looks like regular Python, but executes in a distributed concurrent runtime. One important difference comes up when iterating over a Map, Tuple, or Stream (analogous to a Python dict, tuple, or generator). TinyChain handles these cases using functional programming with the filter, fold, for_each, and map methods. An easy example is using map to create a new Tuple based on an existing Tuple:
import osimport tinychain as tcHOST = tc.host.Host(os.getenv("TC_HOST", "http://127.0.0.1:8702"))ENDPOINT ="/transact/hypothetical"# initialize a new execution contextcxt = tc.Context()# instantiate a Tuplecxt.tuple = tc.Tuple([1, 2, 3])@tc.get_opdefpow(x: tc.Number):return x**2# create a new Tuple by squaring the elements in `cxt.Tuple`cxt.raised = cxt.tuple.map(pow)if__name__=="__main__":# check that the implementation works as expectedassert HOST.post(ENDPOINT, cxt)== [1,4,9]
Often in these cases it's necessary to reference some state in the calling context in the function applied to the stream. You can do this using a closure:
# ...cxt.tuple = tc.Tuple([1, 2, 3])cxt.exponent =2# capture `cxt.exponent` from the outer context@tc.closure(cxt.exponent)@tc.get_opdefpow(x: tc.Number):return x**cxt.exponentcxt.raised = cxt.tuple.map(pow)# ...
Collection types like Table and Tensor all support copying from and into a Stream. For example, you can create a Tensor by reading fields from a Table:
# ...# initialize a new execution contextcxt = tc.Context()# initialize a new tablekey = [tc.Column("order_id", tc.U64)]values = [tc.Column("price", tc.U64)]schema = tc.table.Schema(key, values)cxt.table = tc.table.Table(schema)# add a rowcxt.place_order = cxt.table.insert([1], [499])# convert to a tensorschema = [[1], tc.U64]cxt.prices = tc.tensor.Dense.copy_from(schema, cxt.table.select(["price"]).rows())cxt.result = tc.After(cxt.place_order, cxt.prices)if__name__=="__main__":# check that the implementation works as expectedprint(HOST.post(ENDPOINT, cxt))
Another common use-case for Stream is in place of a for loop. For example: