Closures and functional programming
How to use the @closure decorator for stream processing
TinyChain lets developers write code that mostly looks like regular Python, but executes in a distributed concurrent runtime. One important difference comes up when iterating over a Map, Tuple, or Stream (analogous to a Python dict, tuple, or generator). TinyChain handles these cases using functional programming with the filter, fold, for_each, and map methods. An easy example is using map to create a new Tuple based on an existing Tuple:
1
import os
2
import tinychain as tc
3
4
HOST = tc.host.Host(os.getenv("TC_HOST", "http://127.0.0.1:8702"))
5
ENDPOINT = "/transact/hypothetical"
6
7
# initialize a new execution context
8
cxt = tc.Context()
9
10
# instantiate a Tuple
11
cxt.tuple = tc.Tuple([1, 2, 3])
12
13
@tc.get_op
14
def pow(x: tc.Number):
15
return x**2
16
17
# create a new Tuple by squaring the elements in `cxt.Tuple`
18
cxt.raised = cxt.tuple.map(pow)
19
20
if __name__ == "__main__":
21
# check that the implementation works as expected
22
assert HOST.post(ENDPOINT, cxt) == [1, 4, 9]
Copied!
Often in these cases it's necessary to reference some state in the calling context in the function applied to the stream. You can do this using a closure:
1
# ...
2
cxt.tuple = tc.Tuple([1, 2, 3])
3
cxt.exponent = 2
4
5
# capture `cxt.exponent` from the outer context
6
@tc.closure(cxt.exponent)
7
@tc.get_op
8
def pow(x: tc.Number):
9
return x**cxt.exponent
10
11
cxt.raised = cxt.tuple.map(pow)
12
# ...
Copied!
Collection types like Table and Tensor all support copying from and into a Stream. For example, you can create a Tensor by reading fields from a Table:
1
# ...
2
# initialize a new execution context
3
cxt = tc.Context()
4
5
# initialize a new table
6
key = [tc.Column("order_id", tc.U64)]
7
values = [tc.Column("price", tc.U64)]
8
schema = tc.table.Schema(key, values)
9
cxt.table = tc.table.Table(schema)
10
11
# add a row
12
cxt.place_order = cxt.table.insert([1], [499])
13
14
# convert to a tensor
15
schema = [[1], tc.U64]
16
cxt.prices = tc.tensor.Dense.copy_from(schema, cxt.table.select(["price"]).rows())
17
cxt.result = tc.After(cxt.place_order, cxt.prices)
18
19
if __name__ == "__main__":
20
# check that the implementation works as expected
21
print(HOST.post(ENDPOINT, cxt))
Copied!
Another common use-case for Stream is in place of a for loop. For example:
1
cxt = tc.Context()
2
cxt.tensor = tc.tensor.Dense.constant([3], 1)
3
4
@tc.closure(cxt.tensor)
5
@tc.get_op
6
def pow(i: tc.UInt):
7
return cxt.tensor[i].write(i**2)
8
9
cxt.update = tc.After(tc.Stream.range(cxt.tensor.size).for_each(pow), cxt.tensor)
Copied!

Examples

You can find more complex examples of functional programming in the codebase:
Copy link
Contents
Examples