TinyChain Platform
  • What is TinyChain?
  • Guides
    • Getting Started
    • Python client introduction
    • Types and casting
    • Anatomy of an Op
    • Flow control: after, cond, while_loop
    • Closures and functional programming
    • Install a Library or Service
    • Code a compute graph
    • Debugging
    • Ask for help
  • Fundamentals
    • Install TinyChain
    • Technical Details
  • Advanced
    • Client-side Class Inheritance
  • Use Cases
    • For Engineers
    • For Data Scientists
    • For DevOps
    • For Product Owners & Executives
    • For End-Users
Powered by GitBook
On this page
  1. Guides

Closures and functional programming

How to use the @closure decorator for stream processing

PreviousFlow control: after, cond, while_loopNextInstall a Library or Service

Last updated 2 years ago

TinyChain lets developers write code that mostly looks like regular Python, but executes in a distributed concurrent runtime. One important difference comes up when iterating over a Map, Tuple, or Stream (analogous to a Python dict, tuple, or generator). TinyChain handles these cases using with the filter, fold, for_each, and map methods. An easy example is using map to create a new Tuple based on an existing Tuple:

import os
import tinychain as tc

HOST = tc.host.Host(os.getenv("TC_HOST", "http://127.0.0.1:8702"))
ENDPOINT = "/transact/hypothetical"

# initialize a new execution context
cxt = tc.Context()

# instantiate a Tuple
cxt.tuple = tc.Tuple([1, 2, 3])

@tc.get_op
def pow(x: tc.Number):
    return x**2

# create a new Tuple by squaring the elements in `cxt.Tuple`
cxt.raised = cxt.tuple.map(pow)

if __name__ == "__main__":
    # check that the implementation works as expected
    assert HOST.post(ENDPOINT, cxt) == [1, 4, 9]

Often in these cases it's necessary to reference some state in the calling context in the function applied to the stream. You can do this using a closure:

# ...
cxt.tuple = tc.Tuple([1, 2, 3])
cxt.exponent = 2

# capture `cxt.exponent` from the outer context
@tc.closure(cxt.exponent)
@tc.get_op
def pow(x: tc.Number):
    return x**cxt.exponent

cxt.raised = cxt.tuple.map(pow)
# ...

Collection types like Table and Tensor all support copying from and into a Stream. For example, you can create a Tensor by reading fields from a Table:

# ...
# initialize a new execution context
cxt = tc.Context()

# initialize a new table
key = [tc.Column("order_id", tc.U64)]
values = [tc.Column("price", tc.U64)]
schema = tc.table.Schema(key, values)
cxt.table = tc.table.Table(schema)

# add a row
cxt.place_order = cxt.table.insert([1], [499])

# convert to a tensor
schema = [[1], tc.U64]
cxt.prices = tc.tensor.Dense.copy_from(schema, cxt.table.select(["price"]).rows())
cxt.result = tc.After(cxt.place_order, cxt.prices)

if __name__ == "__main__":
    # check that the implementation works as expected
    print(HOST.post(ENDPOINT, cxt))

Another common use-case for Stream is in place of a for loop. For example:

cxt = tc.Context()
cxt.tensor = tc.tensor.Dense.constant([3], 1)

@tc.closure(cxt.tensor)
@tc.get_op
def pow(i: tc.UInt):
    return cxt.tensor[i].write(i**2)

cxt.update = tc.After(tc.Stream.range(cxt.tensor.size).for_each(pow), cxt.tensor)

Examples

You can find more complex examples of functional programming in the codebase:

the

functional programming
Tensor client tests
Table