diff --git a/design/mvp/Async.md b/design/mvp/Async.md index e2301c3b..e948b741 100644 --- a/design/mvp/Async.md +++ b/design/mvp/Async.md @@ -772,9 +772,15 @@ Canonical ABI Explainer. ## Examples -With that background, we can sketch the shape of an async component that lifts -and lowers its imports and exports with `async`. The meat of this component is -replaced with `...` to focus on the overall flow of function calls. +For a list of working examples expressed as executable WebAssembly Test (WAST) +files, see [this directory](../../test/async). + +This rest of this section sketches the shape of a component that uses `async` +to lift and lower its imports and exports with both the stackful and stackless +ABI options. + +Starting with the stackless ABI, the meat of this example component is replaced +with `...` to focus on the overall flow of function calls: ```wat (component (import "fetch" (func $fetch (param "url" string) (result (list u8)))) diff --git a/test/README.md b/test/README.md new file mode 100644 index 00000000..a14bee8d --- /dev/null +++ b/test/README.md @@ -0,0 +1,10 @@ +# Reference Tests + +This directory contains Component Model reference tests, grouped by functionality. + +## Running in Wasmtime + +A single `.wast` test can be run via: +``` +wasmtime wast -W component-model-async=y the-test.wast +``` diff --git a/test/async/async-calls-sync.wast b/test/async/async-calls-sync.wast new file mode 100644 index 00000000..2f4da2e4 --- /dev/null +++ b/test/async/async-calls-sync.wast @@ -0,0 +1,259 @@ +;; This test contains 3 components, $AsyncInner, $SyncMiddle and $AsyncOuter, +;; where there are two instances of $SyncMiddle that import a single instance +;; of $AsyncInner, and $AsyncOuter imports all 3 preceding instances. +;; +;; $AsyncOuter.run asynchronously calls $SyncMiddle.sync-func twice concurrently +;; in each instance (4 total calls), hitting the synchronous backpressure case +;; in 2 of the 4 calls. +;; +;; $SyncMiddle.sync-func makes a blocking call to $AsyncInner.blocking-call +;; which is used to emulate a host call that blocks until $AsyncOuter.run +;; calls $AsyncInner.unblock to unblock all the 'blocking-call' calls. +(component + (component $AsyncInner + (core module $CoreAsyncInner + (import "" "context.set" (func $context.set (param i32))) + (import "" "context.get" (func $context.get (result i32))) + (import "" "task.return0" (func $task.return0)) + (import "" "task.return1" (func $task.return1 (param i32))) + + (memory 1) + (global $blocked (mut i32) (i32.const 1)) + (global $counter (mut i32) (i32.const 2)) + + ;; 'blocking-call' cooperatively "spin-waits" until $blocked is 0. + (func $blocking-call (export "blocking-call") (result i32) + (call $context.set (global.get $counter)) + (global.set $counter (i32.add (i32.const 1) (global.get $counter))) + (i32.const 1 (; YIELD ;)) + ) + (func $blocking-call-cb (export "blocking-call-cb") (param i32 i32 i32) (result i32) + (if (i32.eqz (global.get $blocked)) (then + (call $task.return1 (call $context.get)) + (return (i32.const 0 (; EXIT ;))) + )) + (i32.const 1 (; YIELD ;)) + ) + (func $unblock (export "unblock") (result i32) + (global.set $blocked (i32.const 0)) + (call $task.return0) + (i32.const 0 (; EXIT ;)) + ) + (func $unblock-cb (export "unblock-cb") (param i32 i32 i32) (result i32) + unreachable + ) + ) + (canon task.return (core func $task.return0)) + (canon task.return (result u32) (core func $task.return1)) + (canon context.set i32 0 (core func $context.set)) + (canon context.get i32 0 (core func $context.get)) + (core instance $core_async_inner (instantiate $CoreAsyncInner (with "" (instance + (export "task.return0" (func $task.return0)) + (export "task.return1" (func $task.return1)) + (export "context.set" (func $context.set)) + (export "context.get" (func $context.get)) + )))) + (func (export "blocking-call") (result u32) (canon lift + (core func $core_async_inner "blocking-call") + async (callback (func $core_async_inner "blocking-call-cb")) + )) + (func (export "unblock") (canon lift + (core func $core_async_inner "unblock") + async (callback (func $core_async_inner "unblock-cb")) + )) + ) + + (component $SyncMiddle + (import "blocking-call" (func $blocking-call (result u32))) + (core module $CoreSyncMiddle + (import "" "blocking-call" (func $blocking-call (result i32))) + (func $sync-func (export "sync-func") (result i32) + (call $blocking-call) + ) + ) + (canon lower (func $blocking-call) (core func $blocking-call')) + (core instance $core_sync_middle (instantiate $CoreSyncMiddle (with "" (instance + (export "blocking-call" (func $blocking-call')) + )))) + (func (export "sync-func") (result u32) (canon lift + (core func $core_sync_middle "sync-func") + )) + ) + + (component $AsyncMiddle + (import "blocking-call" (func $blocking-call (result u32))) + (core module $CoreSyncMiddle + (import "" "task.return" (func $task.return (param i32))) + (import "" "blocking-call" (func $blocking-call (result i32))) + (func $sync-func (export "sync-func") (result i32) + (call $task.return (call $blocking-call)) + (i32.const 0 (; EXIT ;)) + ) + (func $sync-func-cb (export "sync-func-cb") (param i32 i32 i32) (result i32) + unreachable + ) + ) + (canon task.return (result u32) (core func $task.return)) + (canon lower (func $blocking-call) (core func $blocking-call')) + (core instance $core_sync_middle (instantiate $CoreSyncMiddle (with "" (instance + (export "task.return" (func $task.return)) + (export "blocking-call" (func $blocking-call')) + )))) + (func (export "sync-func") (result u32) (canon lift + (core func $core_sync_middle "sync-func") + async (callback (func $core_sync_middle "sync-func-cb")) + )) + ) + + (component $AsyncOuter + (import "unblock" (func $unblock)) + (import "sync-func1" (func $sync-func1 (result u32))) + (import "sync-func2" (func $sync-func2 (result u32))) + + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $CoreAsyncOuter + (import "" "mem" (memory 1)) + (import "" "task.return" (func $task.return (param i32))) + (import "" "subtask.drop" (func $subtask.drop (param i32))) + (import "" "waitable.join" (func $waitable.join (param i32 i32))) + (import "" "waitable-set.new" (func $waitable-set.new (result i32))) + (import "" "unblock" (func $unblock)) + (import "" "sync-func1" (func $sync-func1 (param i32 i32) (result i32))) + (import "" "sync-func2" (func $sync-func2 (param i32 i32) (result i32))) + + (global $ws (mut i32) (i32.const 0)) + (func $start (global.set $ws (call $waitable-set.new))) + (start $start) + + (global $remain (mut i32) (i32.const -1)) + + (func $run (export "run") (result i32) + (local $ret i32) + + ;; call 'sync-func1' and 'sync-func2' asynchronously, both of which will block + ;; (on $AsyncInner.blocking-call). because 'sync-func1/2' are in different instances, + ;; both calls will reach the STARTED state. + (local.set $ret (call $sync-func1 (i32.const 0xdeadbeef) (i32.const 8))) + (if (i32.ne (i32.const 0x21 (; STARTED=1 | (subtask=2 << 4) ;)) (local.get $ret)) + (then unreachable)) + (call $waitable.join (i32.const 2) (global.get $ws)) + (local.set $ret (call $sync-func2 (i32.const 0xdeadbeef) (i32.const 12))) + (if (i32.ne (i32.const 0x31 (; STARTED=1 | (subtask=3 << 4) ;)) (local.get $ret)) + (then unreachable)) + (call $waitable.join (i32.const 3) (global.get $ws)) + + ;; now start another pair of 'sync-func1/2' calls, both of which should see auto + ;; backpressure and get stuck in the STARTING state. + (local.set $ret (call $sync-func1 (i32.const 0xdeadbeef) (i32.const 16))) + (if (i32.ne (i32.const 0x40 (; STARTING=0 | (subtask=4 << 4) ;)) (local.get $ret)) + (then unreachable)) + (call $waitable.join (i32.const 4) (global.get $ws)) + (local.set $ret (call $sync-func2 (i32.const 0xdeadbeef) (i32.const 20))) + (if (i32.ne (i32.const 0x50 (; STARTING=0 | (subtask=5 << 4) ;)) (local.get $ret)) + (then unreachable)) + (call $waitable.join (i32.const 5) (global.get $ws)) + + ;; this POLL should return that nothing is ready + (i32.or (i32.const 3 (; POLL ;)) (i32.shl (global.get $ws) (i32.const 4))) + ) + (func $run-cb (export "run-cb") (param $event_code i32) (param $index i32) (param $payload i32) (result i32) + (local $ret i32) + + ;; $remain is initially -1, so confirm that POLL found nothing was ready and then + ;; unblock all the subtasks and set $remain to 4 to count how many to wait for. + (if (i32.eq (global.get $remain) (i32.const -1)) (then + (if (i32.ne (local.get $event_code) (i32.const 0 (; NONE ;))) + (then unreachable)) + (call $unblock) + (global.set $remain (i32.const 4)) + (return (i32.or (i32.const 2 (; WAIT ;)) (i32.shl (global.get $ws) (i32.const 4)))) + )) + + ;; confirm we only receive SUBTASK events after the first NONE event. + (if (i32.ne (local.get $event_code) (i32.const 1 (; SUBTASK ;))) + (then unreachable)) + + ;; if we receive a SUBTASK STARTED event, it should only be for the 3rd or + ;; 4th subtask (at indices 4/5, resp), so keep waiting for completion + (if (i32.eq (local.get $payload) (i32.const 1 (; STARTED ;))) (then + (if (i32.and + (i32.ne (local.get $index) (i32.const 4)) + (i32.ne (local.get $index) (i32.const 5))) + (then unreachable)) + (return (i32.or (i32.const 2 (; WAIT ;)) (i32.shl (global.get $ws) (i32.const 4)))) + )) + + ;; when we receive a SUBTASK RETURNED event, check the return value is equal to the + ;; subtask index (which we've ensured by having $AsyncInner.$counter start at 2, the + ;; first subtask index. The address of the return buffer is the index*4. + (if (i32.ne (local.get $payload) (i32.const 2 (; RETURNED ;))) + (then unreachable)) + (if (i32.ne (local.get $index) (i32.load (i32.mul (local.get $index) (i32.const 4)))) + (then unreachable)) + + ;; decrement $remain and exit if 0 + (call $subtask.drop (local.get $index)) + (global.set $remain (i32.sub (global.get $remain) (i32.const 1))) + (if (i32.gt_u (global.get $remain) (i32.const 0)) (then + (return (i32.or (i32.const 2 (; WAIT ;)) (i32.shl (global.get $ws) (i32.const 4)))) + )) + (call $task.return (i32.const 42)) + (i32.const 0 (; EXIT ;)) + ) + ) + (canon task.return (result u32) (core func $task.return)) + (canon subtask.drop (core func $subtask.drop)) + (canon waitable.join (core func $waitable.join)) + (canon waitable-set.new (core func $waitable-set.new)) + (canon lower (func $unblock) (core func $unblock)) + (canon lower (func $sync-func1) async (memory $memory "mem") (core func $sync-func1')) + (canon lower (func $sync-func2) async (memory $memory "mem") (core func $sync-func2')) + (core instance $em (instantiate $CoreAsyncOuter (with "" (instance + (export "mem" (memory $memory "mem")) + (export "task.return" (func $task.return)) + (export "subtask.drop" (func $subtask.drop)) + (export "waitable.join" (func $waitable.join)) + (export "waitable-set.new" (func $waitable-set.new)) + (export "unblock" (func $unblock)) + (export "sync-func1" (func $sync-func1')) + (export "sync-func2" (func $sync-func2')) + )))) + (func (export "run") (result u32) (canon lift + (core func $em "run") + async (callback (func $em "run-cb")) + )) + ) + + ;; run1 uses $SyncMiddle + (instance $async_inner1 (instantiate $AsyncInner)) + (instance $sync_middle11 (instantiate $SyncMiddle + (with "blocking-call" (func $async_inner1 "blocking-call")) + )) + (instance $sync_middle12 (instantiate $SyncMiddle + (with "blocking-call" (func $async_inner1 "blocking-call")) + )) + (instance $async_outer1 (instantiate $AsyncOuter + (with "unblock" (func $async_inner1 "unblock")) + (with "sync-func1" (func $sync_middle11 "sync-func")) + (with "sync-func2" (func $sync_middle12 "sync-func")) + )) + (func (export "run1") (alias export $async_outer1 "run")) + + ;; run2 uses $AsyncMiddle + (instance $async_inner2 (instantiate $AsyncInner)) + (instance $sync_middle21 (instantiate $SyncMiddle + (with "blocking-call" (func $async_inner2 "blocking-call")) + )) + (instance $sync_middle22 (instantiate $AsyncMiddle + (with "blocking-call" (func $async_inner2 "blocking-call")) + )) + (instance $async_outer2 (instantiate $AsyncOuter + (with "unblock" (func $async_inner2 "unblock")) + (with "sync-func1" (func $sync_middle21 "sync-func")) + (with "sync-func2" (func $sync_middle22 "sync-func")) + )) + (func (export "run2") (alias export $async_outer2 "run")) +) +(assert_return (invoke "run1") (u32.const 42)) +(assert_return (invoke "run2") (u32.const 42)) diff --git a/test/async/cancel-stream.wast b/test/async/cancel-stream.wast new file mode 100644 index 00000000..7f69840d --- /dev/null +++ b/test/async/cancel-stream.wast @@ -0,0 +1,203 @@ +;; This test contains two components $C and $D that test cancelling reads +;; and writes in the presence and absence of partial reads/writes. +;; +;; $C exports a function 'start-stream' that creates and holds onto a writable +;; stream in the global $sw as well as various operations that operate on $sw. +;; $D calls $C.start-stream to get the readable end and then drives the test. +(component + (component $C + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $CM + (import "" "mem" (memory 1)) + (import "" "task.return" (func $task.return (param i32))) + (import "" "stream.new" (func $stream.new (result i64))) + (import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32))) + (import "" "stream.cancel-write" (func $stream.cancel-write (param i32) (result i32))) + (import "" "stream.close-writable" (func $stream.close-writable (param i32))) + + (global $sw (mut i32) (i32.const 0)) + + (func $start-stream (export "start-stream") (result i32) + ;; create a new stream, return the readable end to the caller + (local $ret64 i64) + (local.set $ret64 (call $stream.new)) + (global.set $sw (i32.wrap_i64 (i64.shr_u (local.get $ret64) (i64.const 32)))) + (i32.wrap_i64 (local.get $ret64)) + ) + + (func $write4 (export "write4") + ;; write 6 bytes into the stream, expecting to rendezvous with a stream.read + (local $ret i32) + (i32.store (i32.const 8) (i32.const 0xabcd)) + (local.set $ret (call $stream.write (global.get $sw) (i32.const 8) (i32.const 4))) + (if (i32.ne (i32.const 0x40 (; COMPLETED=0 | (4<<4) ;)) (local.get $ret)) + (then unreachable)) + ) + + (func $write4-and-close (export "write4-and-close") + (call $write4) + (call $stream.close-writable (global.get $sw)) + ) + + (func $start-blocking-write (export "start-blocking-write") + (local $ret i32) + + ;; prepare the write buffer + (i64.store (i32.const 8) (i64.const 0x123456789abcdef)) + + ;; start one blocking write and immediately cancel it + (local.set $ret (call $stream.write (global.get $sw) (i32.const 8) (i32.const 8))) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + (local.set $ret (call $stream.cancel-write (global.get $sw))) + (if (i32.ne (i32.const 0x2 (; CANCELLED ;)) (local.get $ret)) + (then unreachable)) + + ;; start a second blockign write and leave it pending + (local.set $ret (call $stream.write (global.get $sw) (i32.const 8) (i32.const 8))) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + ) + + (func $cancel-after-read4 (export "cancel-after-read4") + (local $ret i32) + (local.set $ret (call $stream.cancel-write (global.get $sw))) + (if (i32.ne (i32.const 0x42 (; CANCELLED=2 | (4<<4) ;)) (local.get $ret)) + (then unreachable)) + ) + ) + (type $ST (stream u8)) + (canon task.return (result u32) (core func $task.return)) + (canon stream.new $ST (core func $stream.new)) + (canon stream.write $ST async (memory $memory "mem") (core func $stream.write)) + (canon stream.cancel-write $ST (core func $stream.cancel-write)) + (canon stream.close-writable $ST (core func $stream.close-writable)) + (core instance $cm (instantiate $CM (with "" (instance + (export "mem" (memory $memory "mem")) + (export "task.return" (func $task.return)) + (export "stream.new" (func $stream.new)) + (export "stream.write" (func $stream.write)) + (export "stream.cancel-write" (func $stream.cancel-write)) + (export "stream.close-writable" (func $stream.close-writable)) + )))) + (func (export "start-stream") (result (stream u8)) (canon lift (core func $cm "start-stream"))) + (func (export "write4") (canon lift (core func $cm "write4"))) + (func (export "write4-and-close") (canon lift (core func $cm "write4-and-close"))) + (func (export "start-blocking-write") (canon lift (core func $cm "start-blocking-write"))) + (func (export "cancel-after-read4") (canon lift (core func $cm "cancel-after-read4"))) + ) + + (component $D + (import "c" (instance $c + (export "start-stream" (func (result (stream u8)))) + (export "write4" (func)) + (export "write4-and-close" (func)) + (export "start-blocking-write" (func)) + (export "cancel-after-read4" (func)) + )) + + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $DM + (import "" "mem" (memory 1)) + (import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32))) + (import "" "stream.cancel-read" (func $stream.cancel-read (param i32) (result i32))) + (import "" "stream.close-readable" (func $stream.close-readable (param i32))) + (import "" "start-stream" (func $start-stream (result i32))) + (import "" "write4" (func $write4)) + (import "" "write4-and-close" (func $write4-and-close)) + (import "" "start-blocking-write" (func $start-blocking-write)) + (import "" "cancel-after-read4" (func $cancel-after-read4)) + + (func $run (export "run") (result i32) + (local $ret i32) + (local $sr i32) + + ;; call 'start-stream' to get the stream we'll be working with + (local.set $sr (call $start-stream)) + (if (i32.ne (i32.const 1) (local.get $sr)) + (then unreachable)) + + ;; start read that will block + (local.set $ret (call $stream.read (local.get $sr) (i32.const 8) (i32.const 100))) + (if (i32.ne (i32.const -1 (; BLOCKED;)) (local.get $ret)) + (then unreachable)) + + ;; cancelling it will finish without anything having been written + (local.set $ret (call $stream.cancel-read (local.get $sr))) + (if (i32.ne (i32.const 0x2 (; CANCELLED ;)) (local.get $ret)) + (then unreachable)) + + ;; read, block, call $C to write 4 bytes into the buffer, + ;; then cancel, which should show "4+cancelled" + (local.set $ret (call $stream.read (local.get $sr) (i32.const 8) (i32.const 100))) + (if (i32.ne (i32.const -1 (; BLOCKED;)) (local.get $ret)) + (then unreachable)) + (call $write4) + (local.set $ret (call $stream.cancel-read (local.get $sr))) + (if (i32.ne (i32.const 0x42 (; CANCELLED=2 | (4<<4) ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (i32.const 0xabcd) (i32.load (i32.const 8))) + (then unreachable)) + + ;; read, block, call $C to write 4 bytes into the buffer and close, + ;; then cancel, which should show "4+closed" + (local.set $ret (call $stream.read (local.get $sr) (i32.const 8) (i32.const 100))) + (if (i32.ne (i32.const -1 (; BLOCKED;)) (local.get $ret)) + (then unreachable)) + (call $write4-and-close) + (local.set $ret (call $stream.cancel-read (local.get $sr))) + (if (i32.ne (i32.const 0x41 (; CLOSED=1 | (4<<4) ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (i32.const 0xabcd) (i32.load (i32.const 8))) + (then unreachable)) + (call $stream.close-readable (local.get $sr)) + + ;; get a new $sr + (local.set $sr (call $start-stream)) + (if (i32.ne (i32.const 1) (local.get $sr)) + (then unreachable)) + + ;; start outstanding write in $C, read 4 of it, then call back into $C + ;; which will cancel and see 4 written. + (call $start-blocking-write) + (local.set $ret (call $stream.read (local.get $sr) (i32.const 8) (i32.const 4))) + (if (i32.ne (i32.const 0x40 (; COMPLETED=0 | (4<<4) ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (i32.const 0x89abcdef) (i32.load (i32.const 8))) + (then unreachable)) + (call $cancel-after-read4) + + ;; return 42 to the top-level assert_return + (i32.const 42) + ) + ) + (type $ST (stream u8)) + (canon stream.read $ST async (memory $memory "mem") (core func $stream.read)) + (canon stream.cancel-read $ST (core func $stream.cancel-read)) + (canon stream.close-readable $ST (core func $stream.close-readable)) + (canon lower (func $c "start-stream") (core func $start-stream')) + (canon lower (func $c "write4") (core func $write4')) + (canon lower (func $c "write4-and-close") (core func $write4-and-close')) + (canon lower (func $c "start-blocking-write") (core func $start-blocking-write')) + (canon lower (func $c "cancel-after-read4") (core func $cancel-after-read4')) + (core instance $dm (instantiate $DM (with "" (instance + (export "mem" (memory $memory "mem")) + (export "stream.read" (func $stream.read)) + (export "stream.cancel-read" (func $stream.cancel-read)) + (export "stream.close-readable" (func $stream.close-readable)) + (export "start-stream" (func $start-stream')) + (export "write4" (func $write4')) + (export "write4-and-close" (func $write4-and-close')) + (export "start-blocking-write" (func $start-blocking-write')) + (export "cancel-after-read4" (func $cancel-after-read4')) + )))) + (func (export "run") (result u32) (canon lift (core func $dm "run"))) + ) + + (instance $c (instantiate $C)) + (instance $d (instantiate $D (with "c" (instance $c)))) + (func (export "run") (alias export $d "run")) +) +(assert_return (invoke "run") (u32.const 42)) diff --git a/test/async/cancel-subtask.wast b/test/async/cancel-subtask.wast new file mode 100644 index 00000000..3d494ee4 --- /dev/null +++ b/test/async/cancel-subtask.wast @@ -0,0 +1,108 @@ +;; This test contains two components $C and $D where $D imports and calls $C. +;; $D.run calls $C.f, which blocks on an empty waitable set +;; $D.run then subtask.cancels $C.f, which resumes $C.f which promptly resolves +;; without returning a value. +(component + (component $C + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $CM + (import "" "mem" (memory 1)) + (import "" "task.cancel" (func $task.cancel)) + (import "" "waitable.join" (func $waitable.join (param i32 i32))) + (import "" "waitable-set.new" (func $waitable-set.new (result i32))) + + ;; $ws is waited on by 'f' + (global $ws (mut i32) (i32.const 0)) + (func $start (global.set $ws (call $waitable-set.new))) + (start $start) + + (func $f (export "f") (result i32) + ;; wait on $ws which is currently empty, expected to get cancelled + (i32.or (i32.const 2 (; WAIT ;)) (i32.shl (global.get $ws) (i32.const 4))) + ) + (func $f_cb (export "f_cb") (param $event_code i32) (param $index i32) (param $payload i32) (result i32) + ;; confirm that we've received a cancellation request + (if (i32.ne (local.get $event_code) (i32.const 6 (; TASK_CANCELLED ;))) + (then unreachable)) + (if (i32.ne (local.get $index) (i32.const 0)) + (then unreachable)) + (if (i32.ne (local.get $payload) (i32.const 0)) + (then unreachable)) + + ;; finish without returning a value + (call $task.cancel) + (i32.const 0 (; EXIT ;)) + ) + ) + (canon task.cancel (core func $task.cancel)) + (canon waitable.join (core func $waitable.join)) + (canon waitable-set.new (core func $waitable-set.new)) + (core instance $cm (instantiate $CM (with "" (instance + (export "mem" (memory $memory "mem")) + (export "task.cancel" (func $task.cancel)) + (export "waitable.join" (func $waitable.join)) + (export "waitable-set.new" (func $waitable-set.new)) + )))) + (func (export "f") (result u32) (canon lift + (core func $cm "f") + async (callback (func $cm "f_cb")) + )) + ) + + (component $D + (import "f" (func $f (result u32))) + + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $DM + (import "" "mem" (memory 1)) + (import "" "subtask.cancel" (func $subtask.cancel (param i32) (result i32))) + (import "" "subtask.drop" (func $subtask.drop (param i32))) + (import "" "f" (func $f (param i32 i32) (result i32))) + + (func $run (export "run") (result i32) + (local $ret i32) (local $retp i32) + (local $subtask i32) + (local $event_code i32) + + ;; call 'f'; it should block + (local.set $retp (i32.const 4)) + (i32.store (local.get $retp) (i32.const 0xbad0bad0)) + (local.set $ret (call $f (i32.const 0xdeadbeef) (local.get $retp))) + (if (i32.ne (i32.const 1 (; STARTED ;)) (i32.and (local.get $ret) (i32.const 0xf))) + (then unreachable)) + (local.set $subtask (i32.shr_u (local.get $ret) (i32.const 4))) + + ;; cancel 'f'; it should complete without blocking + (local.set $ret (call $subtask.cancel (local.get $subtask))) + (if (i32.ne (i32.const 4 (; CANCELLED_BEFORE_RETURNED ;)) (local.get $ret)) + (then unreachable)) + + ;; The $retp memory shouldn't have changed + (if (i32.ne (i32.load (local.get $retp)) (i32.const 0xbad0bad0)) + (then unreachable)) + + (call $subtask.drop (local.get $subtask)) + + ;; return to the top-level assert_return + (i32.const 42) + ) + ) + (canon subtask.cancel (core func $subtask.cancel)) + (canon subtask.drop (core func $subtask.drop)) + (canon lower (func $f) async (memory $memory "mem") (core func $f')) + (core instance $dm (instantiate $DM (with "" (instance + (export "mem" (memory $memory "mem")) + (export "subtask.cancel" (func $subtask.cancel)) + (export "subtask.drop" (func $subtask.drop)) + (export "f" (func $f')) + )))) + (func (export "run") (result u32) (canon lift (core func $dm "run"))) + ) + + (instance $c (instantiate $C)) + (instance $d (instantiate $D (with "f" (func $c "f")))) + (func (export "run") (alias export $d "run")) +) +(assert_return (invoke "run") (u32.const 42)) diff --git a/test/async/close-stream.wast b/test/async/close-stream.wast new file mode 100644 index 00000000..2f8d2053 --- /dev/null +++ b/test/async/close-stream.wast @@ -0,0 +1,160 @@ +;; This test contains two components $C and $D that test that traps occur +;; when closing the readable or writable end of stream while a read or write +;; is pending. In particular, even if a partial copy has happened into the +;; buffer such that waiting/polling for an event *would* produce a STREAM +;; READ/WRITE event, if the event has not been delivered, the operation is +;; still considered pending. +(component definition $Tester + (component $C + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $CM + (import "" "mem" (memory 1)) + (import "" "stream.new" (func $stream.new (result i64))) + (import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32))) + (import "" "stream.close-writable" (func $stream.close-writable (param i32))) + + (global $sw (mut i32) (i32.const 0)) + + (func $start-stream (export "start-stream") (result i32) + ;; create a new stream, return the readable end to the caller + (local $ret64 i64) + (local.set $ret64 (call $stream.new)) + (global.set $sw (i32.wrap_i64 (i64.shr_u (local.get $ret64) (i64.const 32)))) + (i32.wrap_i64 (local.get $ret64)) + ) + (func $write4 (export "write4") + ;; write 6 bytes into the stream, expecting to rendezvous with a stream.read + (local $ret i32) + (i32.store (i32.const 8) (i32.const 0x12345678)) + (local.set $ret (call $stream.write (global.get $sw) (i32.const 8) (i32.const 4))) + (if (i32.ne (i32.const 0x40 (; COMPLETED=0 | (4<<4) ;)) (local.get $ret)) + (then unreachable)) + ) + (func $start-blocking-write (export "start-blocking-write") + (local $ret i32) + + ;; prepare the write buffer + (i64.store (i32.const 8) (i64.const 0x123456789abcdef)) + + ;; start a blocking write + (local.set $ret (call $stream.write (global.get $sw) (i32.const 8) (i32.const 8))) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + ) + (func $close-writable (export "close-writable") + ;; boom + (call $stream.close-writable (global.get $sw)) + ) + ) + (type $ST (stream u8)) + (canon stream.new $ST (core func $stream.new)) + (canon stream.write $ST async (memory $memory "mem") (core func $stream.write)) + (canon stream.close-writable $ST (core func $stream.close-writable)) + (core instance $cm (instantiate $CM (with "" (instance + (export "mem" (memory $memory "mem")) + (export "stream.new" (func $stream.new)) + (export "stream.write" (func $stream.write)) + (export "stream.close-writable" (func $stream.close-writable)) + )))) + (func (export "start-stream") (result (stream u8)) (canon lift (core func $cm "start-stream"))) + (func (export "write4") (canon lift (core func $cm "write4"))) + (func (export "start-blocking-write") (canon lift (core func $cm "start-blocking-write"))) + (func (export "close-writable") (canon lift (core func $cm "close-writable"))) + ) + (component $D + (import "c" (instance $c + (export "start-stream" (func (result (stream u8)))) + (export "write4" (func)) + (export "start-blocking-write" (func)) + (export "close-writable" (func)) + )) + + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $Core + (import "" "mem" (memory 1)) + (import "" "stream.new" (func $stream.new (result i64))) + (import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32))) + (import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32))) + (import "" "stream.close-readable" (func $stream.close-readable (param i32))) + (import "" "stream.close-writable" (func $stream.close-writable (param i32))) + (import "" "start-stream" (func $start-stream (result i32))) + (import "" "write4" (func $write4)) + (import "" "start-blocking-write" (func $start-blocking-write)) + (import "" "close-writable" (func $close-writable)) + + (func (export "close-while-reading") + (local $ret i32) (local $sr i32) + + ;; call 'start-stream' to get the stream we'll be working with + (local.set $sr (call $start-stream)) + (if (i32.ne (i32.const 1) (local.get $sr)) + (then unreachable)) + + ;; start a blocking read + (local.set $ret (call $stream.read (local.get $sr) (i32.const 8) (i32.const 100))) + (if (i32.ne (i32.const -1 (; BLOCKED;)) (local.get $ret)) + (then unreachable)) + + ;; write into the buffer, but the read is still in progress since we + ;; haven't received notification yet. + (call $write4) + (if (i32.ne (i32.const 0x12345678) (i32.load (i32.const 8))) + (then unreachable)) + + ;; boom + (call $stream.close-readable (local.get $sr)) + ) + (func (export "close-while-writing") + (local $ret i32) (local $sr i32) + + ;; call 'start-stream' to get the stream we'll be working with + (local.set $sr (call $start-stream)) + (if (i32.ne (i32.const 1) (local.get $sr)) + (then unreachable)) + + ;; start a blocking write and partially read from it + (call $start-blocking-write) + (local.set $ret (call $stream.read (local.get $sr) (i32.const 8) (i32.const 4))) + (if (i32.ne (i32.const 0x40 (; COMPLETED=0 | (4<<4) ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (i32.const 0x89abcdef) (i32.load (i32.const 8))) + (then unreachable)) + (call $close-writable) + ) + ) + (type $ST (stream u8)) + (canon stream.new $ST (core func $stream.new)) + (canon stream.read $ST async (memory $memory "mem") (core func $stream.read)) + (canon stream.write $ST async (memory $memory "mem") (core func $stream.write)) + (canon stream.close-readable $ST (core func $stream.close-readable)) + (canon stream.close-writable $ST (core func $stream.close-writable)) + (canon lower (func $c "start-stream") (core func $start-stream')) + (canon lower (func $c "write4") (core func $write4')) + (canon lower (func $c "start-blocking-write") (core func $start-blocking-write')) + (canon lower (func $c "close-writable") (core func $close-writable')) + (core instance $core (instantiate $Core (with "" (instance + (export "mem" (memory $memory "mem")) + (export "stream.new" (func $stream.new)) + (export "stream.read" (func $stream.read)) + (export "stream.write" (func $stream.write)) + (export "stream.close-readable" (func $stream.close-readable)) + (export "stream.close-writable" (func $stream.close-writable)) + (export "start-stream" (func $start-stream')) + (export "write4" (func $write4')) + (export "start-blocking-write" (func $start-blocking-write')) + (export "close-writable" (func $close-writable')) + )))) + (func (export "close-while-reading") (canon lift (core func $core "close-while-reading"))) + (func (export "close-while-writing") (canon lift (core func $core "close-while-writing"))) + ) + (instance $c (instantiate $C)) + (instance $d (instantiate $D (with "c" (instance $c)))) + (func (export "close-while-reading") (alias export $d "close-while-reading")) + (func (export "close-while-writing") (alias export $d "close-while-writing")) +) +(component instance $new-tester-instance $Tester) +(assert_trap (invoke "close-while-reading") "cannot drop busy stream or future") +(component instance $new-tester-instance $Tester) +(assert_trap (invoke "close-while-writing") "cannot drop busy stream or future") diff --git a/test/async/deadlock.wast b/test/async/deadlock.wast new file mode 100644 index 00000000..0c9861b8 --- /dev/null +++ b/test/async/deadlock.wast @@ -0,0 +1,73 @@ +;; This test defines components $C and $D where $D imports and calls $C +;; $C.f waits on an empty waitable set +;; $D.g calls $C.f and then waits for it to finish, which fails due to deadlock +(component + (component $C + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $CM + (import "" "mem" (memory 1)) + (import "" "waitable-set.new" (func $waitable-set.new (result i32))) + + (func (export "f") (result i32) + ;; wait on a new empty waitable set + (local $ws i32) + (local.set $ws (call $waitable-set.new)) + (i32.or (i32.const 2 (; WAIT ;)) (i32.shl (local.get $ws) (i32.const 4))) + ) + (func (export "cb") (param $event_code i32) (param $index i32) (param $payload i32) (result i32) + unreachable + ) + ) + (canon waitable-set.new (core func $waitable-set.new)) + (core instance $cm (instantiate $CM (with "" (instance + (export "mem" (memory $memory "mem")) + (export "waitable-set.new" (func $waitable-set.new)) + )))) + (func (export "f") (result u32) (canon lift + (core func $cm "f") + async (memory $memory "mem") (callback (func $cm "cb")) + )) + ) + + (component $D + (import "f" (func $f (result u32))) + + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $DM + (import "" "mem" (memory 1)) + (import "" "waitable.join" (func $waitable.join (param i32 i32))) + (import "" "waitable-set.new" (func $waitable-set.new (result i32))) + (import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32))) + (import "" "f" (func $f (param i32 i32) (result i32))) + + (func (export "g") (result i32) + (local $ws i32) (local $ret i32) (local $subtaski i32) + (local.set $ret (call $f (i32.const 0) (i32.const 0))) + (local.set $subtaski (i32.shr_u (local.get $ret) (i32.const 4))) + (local.set $ws (call $waitable-set.new)) + (call $waitable.join (local.get $subtaski) (local.get $ws)) + (call $waitable-set.wait (local.get $ws) (i32.const 0)) + unreachable + ) + ) + (canon waitable.join (core func $waitable.join)) + (canon waitable-set.new (core func $waitable-set.new)) + (canon waitable-set.wait (memory $memory "mem") (core func $waitable-set.wait)) + (canon lower (func $f) async (memory $memory "mem") (core func $f')) + (core instance $dm (instantiate $DM (with "" (instance + (export "mem" (memory $memory "mem")) + (export "waitable.join" (func $waitable.join)) + (export "waitable-set.new" (func $waitable-set.new)) + (export "waitable-set.wait" (func $waitable-set.wait)) + (export "f" (func $f')) + )))) + (func (export "f") (result u32) (canon lift (core func $dm "g"))) + ) + + (instance $c (instantiate $C)) + (instance $d (instantiate $D (with "f" (func $c "f")))) + (func (export "f") (alias export $d "f")) +) +(assert_trap (invoke "f") "wasm trap: deadlock detected: event loop cannot make further progress") diff --git a/test/async/drop-subtask.wast b/test/async/drop-subtask.wast new file mode 100644 index 00000000..1959866b --- /dev/null +++ b/test/async/drop-subtask.wast @@ -0,0 +1,140 @@ +;; This test contains two components: $Looper and $Caller. +;; $Caller starts an async subtask for $Looper.loop and then drops these +;; subtasks in both allowed and disallowed cases, testing for success and +;; traps. +(component + (component $Looper + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $CoreLooper + (import "" "mem" (memory 1)) + (import "" "task.return" (func $task.return)) + + (global $done (mut i32) (i32.const 0)) + + (func $loop (export "loop") (result i32) + (i32.const 1 (; YIELD ;)) + ) + (func $loop_cb (export "loop_cb") (param $event_code i32) (param $index i32) (param $payload i32) (result i32) + ;; confirm that we've received a cancellation request + (if (i32.ne (local.get $event_code) (i32.const 0 (; NONE ;))) + (then unreachable)) + (if (i32.ne (local.get $index) (i32.const 0)) + (then unreachable)) + (if (i32.ne (local.get $payload) (i32.const 0)) + (then unreachable)) + + (if (i32.eqz (global.get $done)) + (then (return (i32.const 1 (; YIELD ;))))) + (call $task.return) + (i32.const 0 (; EXIT ;)) + ) + + (func $return (export "return") + (global.set $done (i32.const 1)) + ) + ) + (canon task.return (core func $task.return)) + (core instance $core_looper (instantiate $CoreLooper (with "" (instance + (export "mem" (memory $memory "mem")) + (export "task.return" (func $task.return)) + )))) + (func (export "loop") (canon lift + (core func $core_looper "loop") + async (callback (func $core_looper "loop_cb")) + )) + (func (export "return") (canon lift + (core func $core_looper "return") + )) + ) + + (component $Caller + (import "looper" (instance $looper + (export "loop" (func)) + (export "return" (func)) + )) + + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $CoreCaller + (import "" "mem" (memory 1)) + (import "" "subtask.drop" (func $subtask.drop (param i32))) + (import "" "waitable.join" (func $waitable.join (param i32 i32))) + (import "" "waitable-set.new" (func $waitable-set.new (result i32))) + (import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32))) + (import "" "loop" (func $loop (param i32 i32) (result i32))) + (import "" "return" (func $return)) + + (func $drop-after-return (export "drop-after-return") (result i32) + (local $ret i32) (local $ws i32) (local $subtask i32) + + ;; start 'loop' + (local.set $ret (call $loop (i32.const 0xdead) (i32.const 0xbeef))) + (if (i32.ne (i32.const 1 (; STARTED ;)) (i32.and (local.get $ret) (i32.const 0xf))) + (then unreachable)) + (local.set $subtask (i32.shr_u (local.get $ret) (i32.const 4))) + + ;; tell 'loop' to stop + (call $return) + + ;; wait for 'loop' to run and return + (local.set $ws (call $waitable-set.new)) + (call $waitable.join (local.get $subtask) (local.get $ws)) + (local.set $ret (call $waitable-set.wait (local.get $ws) (i32.const 0))) + (if (i32.ne (i32.const 1 (; SUBTASK ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (local.get $subtask) (i32.load (i32.const 0))) + (then unreachable)) + (if (i32.ne (i32.const 2 (; RETURNED ;)) (i32.load (i32.const 4))) + (then unreachable)) + + ;; ok to drop + (call $subtask.drop (local.get $subtask)) + (i32.const 42) + ) + + (func $drop-before-return (export "drop-before-return") (result i32) + (local $ret i32) (local $subtask i32) + + ;; start 'loop' + (local.set $ret (call $loop (i32.const 0xdead) (i32.const 0xbeef))) + (if (i32.ne (i32.const 1 (; STARTED ;)) (i32.and (local.get $ret) (i32.const 0xf))) + (then unreachable)) + (local.set $subtask (i32.shr_u (local.get $ret) (i32.const 4))) + + ;; this should trap + (call $subtask.drop (local.get $subtask)) + unreachable + ) + ) + (canon subtask.drop (core func $subtask.drop)) + (canon waitable.join (core func $waitable.join)) + (canon waitable-set.new (core func $waitable-set.new)) + (canon waitable-set.wait (memory $memory "mem") (core func $waitable-set.wait)) + (canon lower (func $looper "loop") async (memory $memory "mem") (core func $loop')) + (canon lower (func $looper "return") (memory $memory "mem") (core func $return')) + (core instance $core_caller (instantiate $CoreCaller (with "" (instance + (export "mem" (memory $memory "mem")) + (export "subtask.drop" (func $subtask.drop)) + (export "waitable.join" (func $waitable.join)) + (export "waitable-set.new" (func $waitable-set.new)) + (export "waitable-set.wait" (func $waitable-set.wait)) + (export "loop" (func $loop')) + (export "return" (func $return')) + )))) + (func (export "drop-after-return") (result u32) (canon lift + (core func $core_caller "drop-after-return") + )) + (func (export "drop-before-return") (result u32) (canon lift + (core func $core_caller "drop-before-return") + )) + ) + + (instance $looper (instantiate $Looper)) + (instance $caller1 (instantiate $Caller (with "looper" (instance $looper)))) + (instance $caller2 (instantiate $Caller (with "looper" (instance $looper)))) + (func (export "drop-after-return") (alias export $caller1 "drop-after-return")) + (func (export "drop-before-return") (alias export $caller2 "drop-before-return")) +) +(assert_return (invoke "drop-after-return") (u32.const 42)) +(assert_trap (invoke "drop-before-return") "cannot drop a subtask which has not yet resolved") diff --git a/test/async/drop-waitable-set.wast b/test/async/drop-waitable-set.wast new file mode 100644 index 00000000..cf30223d --- /dev/null +++ b/test/async/drop-waitable-set.wast @@ -0,0 +1,84 @@ +;; This test contains two components $C and $D +;; $D.run drives the test and first calls $C.wait-on-set, which waits on +;; a waitable-set. Then $D.run calls $C.drop-while-waiting which attempts +;; to drop the same waitable-set, which should trap. +(component + (component $C + (core module $Core + (import "" "waitable-set.new" (func $waitable-set.new (result i32))) + (import "" "waitable-set.drop" (func $waitable-set.drop (param i32))) + + (global $ws (mut i32) (i32.const 0)) + (func $start (global.set $ws (call $waitable-set.new))) + (start $start) + + (func $wait-on-set (export "wait-on-set") (result i32) + ;; wait on $ws + (i32.or (i32.const 2 (; WAIT ;)) (i32.shl (global.get $ws) (i32.const 4))) + ) + (func $drop-while-waiting (export "drop-while-waiting") (result i32) + ;; boom + (call $waitable-set.drop (global.get $ws)) + unreachable + ) + (func $unreachable-cb (export "unreachable-cb") (param i32 i32 i32) (result i32) + unreachable + ) + ) + (canon waitable-set.new (core func $waitable-set.new)) + (canon waitable-set.drop (core func $waitable-set.drop)) + (core instance $core (instantiate $Core (with "" (instance + (export "waitable-set.new" (func $waitable-set.new)) + (export "waitable-set.drop" (func $waitable-set.drop)) + )))) + (func (export "wait-on-set") (canon lift + (core func $core "wait-on-set") + async (callback (func $core "unreachable-cb")) + )) + (func (export "drop-while-waiting") (canon lift + (core func $core "drop-while-waiting") + async (callback (func $core "unreachable-cb")) + )) + ) + + (component $D + (import "c" (instance $c + (export "wait-on-set" (func)) + (export "drop-while-waiting" (func)) + )) + + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $Core + (import "" "mem" (memory 1)) + (import "" "wait-on-set" (func $wait-on-set (param i32 i32) (result i32))) + (import "" "drop-while-waiting" (func $drop-while-waiting)) + (func $run (export "run") (result i32) + (local $ret i32) + + ;; start an async call to 'wait-on-set' which blocks, waiting on a + ;; waitable-set. + (local.set $ret (call $wait-on-set (i32.const 0xdeadbeef) (i32.const 0xdeadbeef))) + (if (i32.ne (i32.const 0x11) (local.get $ret)) + (then unreachable)) + + ;; this call will try to drop the same waitable-set, which should trap. + (call $drop-while-waiting) + unreachable + ) + ) + (canon lower (func $c "wait-on-set") async (memory $memory "mem") (core func $wait-on-set')) + (canon lower (func $c "drop-while-waiting") (core func $drop-while-waiting')) + (core instance $core (instantiate $Core (with "" (instance + (export "mem" (memory $memory "mem")) + (export "wait-on-set" (func $wait-on-set')) + (export "drop-while-waiting" (func $drop-while-waiting')) + )))) + (func (export "run") (result u32) (canon lift (core func $core "run"))) + ) + + (instance $c (instantiate $C)) + (instance $d (instantiate $D (with "c" (instance $c)))) + (func (export "run") (alias export $d "run")) +) +(assert_trap (invoke "run") "cannot drop waitable set with waiters") diff --git a/test/async/empty-wait.wast b/test/async/empty-wait.wast new file mode 100644 index 00000000..5ee17505 --- /dev/null +++ b/test/async/empty-wait.wast @@ -0,0 +1,199 @@ +;; This test has two components $C and $D, where $D imports and calls $C +;; $C exports two functions: 'blocker' and 'unblocker' +;; 'blocker' blocks on an empty waitable set +;; 'unblocker' wakes blocker by adding a resolved future to blocker's waitable set +;; $D calls 'blocker' then 'unblocker', then waits for 'blocker' to finish +(component + (component $C + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $CM + (import "" "mem" (memory 1)) + (import "" "task.return" (func $task.return (param i32))) + (import "" "waitable.join" (func $waitable.join (param i32 i32))) + (import "" "waitable-set.new" (func $waitable-set.new (result i32))) + (import "" "future.new" (func $future.new (result i64))) + (import "" "future.read" (func $future.read (param i32 i32) (result i32))) + (import "" "future.write" (func $future.write (param i32 i32) (result i32))) + (import "" "future.close-readable" (func $future.close-readable (param i32))) + (import "" "future.close-writable" (func $future.close-writable (param i32))) + + ;; $ws is waited on by 'blocker' and added to by 'unblocker' + (global $ws (mut i32) (i32.const 0)) + (func $start (global.set $ws (call $waitable-set.new))) + (start $start) + + ;; 'unblocker' initializes $futr with the readable end of a resolved future + (global $futr (mut i32) (i32.const 0)) + + (func $blocker (export "blocker") (result i32) + ;; wait on $ws which is currently empty; 'unblocker' will wake us up + (i32.or (i32.const 2 (; WAIT ;)) (i32.shl (global.get $ws) (i32.const 4))) + ) + (func $blocker_cb (export "blocker_cb") (param $event_code i32) (param $index i32) (param $payload i32) (result i32) + ;; assert that we were in fact woken by 'unblocker' adding $futr to $ws + (if (i32.ne (i32.const 4 (; FUTURE_READ ;)) (local.get $event_code)) + (then unreachable)) + (if (i32.ne (global.get $futr) (local.get $index)) + (then unreachable)) + (if (i32.ne (i32.const 0x11 (; CLOSED=1 | (1<<4) ;)) (local.get $payload)) + (then unreachable)) + + (call $future.close-readable (global.get $futr)) + + ;; return 42 to $D.run + (call $task.return (i32.const 42)) + (i32.const 0) + ) + + (func $unblocker (export "unblocker") (result i32) + (local $ret i32) (local $ret64 i64) + (local $futw i32) + + ;; create a future that will be used to unblock 'blocker', storing r/w ends in $futr/$futw + (local.set $ret64 (call $future.new)) + (global.set $futr (i32.wrap_i64 (local.get $ret64))) + (if (i32.ne (i32.const 2) (global.get $futr)) + (then unreachable)) + (local.set $futw (i32.wrap_i64 (i64.shr_u (local.get $ret64) (i64.const 32)))) + (if (i32.ne (i32.const 3) (local.get $futw)) + (then unreachable)) + + ;; perform a future.read which will block, and add this future to the waitable-set + ;; being waited on by 'blocker' + (local.set $ret (call $future.read (global.get $futr) (i32.const 0xdeadbeef))) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + (call $waitable.join (global.get $futr) (global.get $ws)) + + ;; perform a future.write which will rendezvous with the write and complete + (local.set $ret (call $future.write (local.get $futw) (i32.const 0xdeadbeef))) + (if (i32.ne (i32.const 0x11 (; CLOSED=1 | (1<<4) ;)) (local.get $ret)) + (then unreachable)) + + (call $future.close-writable (local.get $futw)) + + ;; return 43 to $D.run + (call $task.return (i32.const 43)) + (i32.const 0) + ) + (func $unblocker_cb (export "unblocker_cb") (param i32 i32 i32) (result i32) + ;; 'unblocker' doesn't block + unreachable + ) + ) + (type $FT (future)) + (canon task.return (result u32) (core func $task.return)) + (canon waitable.join (core func $waitable.join)) + (canon waitable-set.new (core func $waitable-set.new)) + (canon future.new $FT (core func $future.new)) + (canon future.read $FT async (memory $memory "mem") (core func $future.read)) + (canon future.write $FT async (memory $memory "mem") (core func $future.write)) + (canon future.close-readable $FT (core func $future.close-readable)) + (canon future.close-writable $FT (core func $future.close-writable)) + (core instance $cm (instantiate $CM (with "" (instance + (export "mem" (memory $memory "mem")) + (export "task.return" (func $task.return)) + (export "waitable.join" (func $waitable.join)) + (export "waitable-set.new" (func $waitable-set.new)) + (export "future.new" (func $future.new)) + (export "future.read" (func $future.read)) + (export "future.write" (func $future.write)) + (export "future.close-readable" (func $future.close-readable)) + (export "future.close-writable" (func $future.close-writable)) + )))) + (func (export "blocker") (result u32) (canon lift + (core func $cm "blocker") + async (callback (func $cm "blocker_cb")) + )) + (func (export "unblocker") (result u32) (canon lift + (core func $cm "unblocker") + async (callback (func $cm "unblocker_cb")) + )) + ) + + (component $D + (import "c" (instance $c + (export "blocker" (func (result u32))) + (export "unblocker" (func (result u32))) + )) + + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $DM + (import "" "mem" (memory 1)) + (import "" "waitable.join" (func $waitable.join (param i32 i32))) + (import "" "waitable-set.new" (func $waitable-set.new (result i32))) + (import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32))) + (import "" "subtask.drop" (func $subtask.drop (param i32))) + (import "" "blocker" (func $blocker (param i32 i32) (result i32))) + (import "" "unblocker" (func $unblocker (param i32 i32) (result i32))) + + (global $ws (mut i32) (i32.const 0)) + (func $start (global.set $ws (call $waitable-set.new))) + (start $start) + + (func $run (export "run") (result i32) + (local $ret i32) (local $retp1 i32) (local $retp2 i32) + (local $subtask i32) + (local $event_code i32) + + ;; call 'blocker'; it should block + (local.set $retp1 (i32.const 4)) + (local.set $ret (call $blocker (i32.const 0xdeadbeef) (local.get $retp1))) + (if (i32.ne (i32.const 1 (; STARTED ;)) (i32.and (local.get $ret) (i32.const 0xf))) + (then unreachable)) + (local.set $subtask (i32.shr_u (local.get $ret) (i32.const 4))) + (if (i32.ne (i32.const 2 (; RETURNED=2 | (0<<4) ;)) (local.get $subtask)) + (then unreachable)) + + ;; call 'unblocker' to unblock 'blocker'; it should complete eagerly + (local.set $retp2 (i32.const 8)) + (local.set $ret (call $unblocker (i32.const 0xbeefdead) (local.get $retp2))) + (if (i32.ne (i32.const 2 (; RETURNED ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (i32.const 43) (i32.load (local.get $retp2))) + (then unreachable)) + + ;; wait for 'blocker' to be scheduled, run, and return + (call $waitable.join (local.get $subtask) (global.get $ws)) + (local.set $retp2 (i32.const 8)) + (local.set $event_code (call $waitable-set.wait (global.get $ws) (local.get $retp2))) + (if (i32.ne (i32.const 1 (; SUBTASK ;)) (local.get $event_code)) + (then unreachable)) + (if (i32.ne (local.get $subtask) (i32.load (local.get $retp2))) + (then unreachable)) + (if (i32.ne (i32.const 2 (; RETURNED=2 | (0<<4) ;)) (i32.load offset=4 (local.get $retp2))) + (then unreachable)) + (if (i32.ne (i32.const 42) (i32.load (local.get $retp1))) + (then unreachable)) + + (call $subtask.drop (local.get $subtask)) + + ;; return 44 to the top-level test harness + (i32.const 44) + ) + ) + (canon waitable.join (core func $waitable.join)) + (canon waitable-set.new (core func $waitable-set.new)) + (canon waitable-set.wait (memory $memory "mem") (core func $waitable-set.wait)) + (canon subtask.drop (core func $subtask.drop)) + (canon lower (func $c "blocker") async (memory $memory "mem") (core func $blocker')) + (canon lower (func $c "unblocker") async (memory $memory "mem") (core func $unblocker')) + (core instance $dm (instantiate $DM (with "" (instance + (export "mem" (memory $memory "mem")) + (export "waitable.join" (func $waitable.join)) + (export "waitable-set.new" (func $waitable-set.new)) + (export "waitable-set.wait" (func $waitable-set.wait)) + (export "subtask.drop" (func $subtask.drop)) + (export "blocker" (func $blocker')) + (export "unblocker" (func $unblocker')) + )))) + (func (export "run") (result u32) (canon lift (core func $dm "run"))) + ) + + (instance $c (instantiate $C)) + (instance $d (instantiate $D (with "c" (instance $c)))) + (func (export "run") (alias export $d "run")) +) +(assert_return (invoke "run") (u32.const 44)) diff --git a/test/async/partial-stream-copies.wast b/test/async/partial-stream-copies.wast new file mode 100644 index 00000000..42a4b62e --- /dev/null +++ b/test/async/partial-stream-copies.wast @@ -0,0 +1,240 @@ +;; This test has two components $C and $D, where $D imports and calls $C.transform +;; $C.transform takes and returns a stream +;; Before $C.transform blocks the first time, it supplies a 12-byte read buffer +;; When $D.run regains control after $C.transform blocks, it can perform multiple +;; successful writes until it fully uses up the 12-byte buffer. +;; ... and that's where I am so far ... +(component + (component $C + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $CM + (import "" "mem" (memory 1)) + (import "" "task.return" (func $task.return (param i32))) + (import "" "waitable.join" (func $waitable.join (param i32 i32))) + (import "" "waitable-set.new" (func $waitable-set.new (result i32))) + (import "" "stream.new" (func $stream.new (result i64))) + (import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32))) + (import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32))) + (import "" "stream.close-readable" (func $stream.close-readable (param i32))) + (import "" "stream.close-writable" (func $stream.close-writable (param i32))) + + ;; $ws is waited on by 'transform' + (global $ws (mut i32) (i32.const 0)) + (func $start (global.set $ws (call $waitable-set.new))) + (start $start) + + ;; $insr/$outsw are read/written by 'transform' + (global $insr (mut i32) (i32.const 0)) + (global $inbufp (mut i32) (i32.const 0x10)) + (global $outsw (mut i32) (i32.const 0)) + (global $outbufp (mut i32) (i32.const 0x20)) + + (func $transform (export "transform") (param i32) (result i32) + (local $ret i32) (local $ret64 i64) (local $outsr i32) + + ;; check the incoming readable stream end + (global.set $insr (local.get 0)) + (if (i32.ne (i32.const 2) (global.get $insr)) + (then unreachable)) + + ;; create a new stream r/w pair $outsr/$outsw + (local.set $ret64 (call $stream.new)) + (local.set $outsr (i32.wrap_i64 (local.get $ret64))) + (if (i32.ne (i32.const 3) (local.get $outsr)) + (then unreachable)) + (global.set $outsw (i32.wrap_i64 (i64.shr_u (local.get $ret64) (i64.const 32)))) + (if (i32.ne (i32.const 4) (global.get $outsw)) + (then unreachable)) + + ;; start async read on $insr which will block + (local.set $ret (call $stream.read (global.get $insr) (global.get $inbufp) (i32.const 12))) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + + ;; return the readable end of the outgoing stream to the caller + (call $task.return (local.get $outsr)) + + ;; wait for the stream.read/write to complete + (call $waitable.join (global.get $insr) (global.get $ws)) + (call $waitable.join (global.get $outsw) (global.get $ws)) + (i32.or (i32.const 2 (; WAIT ;)) (i32.shl (global.get $ws) (i32.const 4))) + ) + (func $transform_cb (export "transform_cb") (param $event_code i32) (param $index i32) (param $payload i32) (result i32) + (local $ret i32) (local $ret64 i64) + + ;; confirm the read succeeded fully + (if (i32.ne (local.get $event_code) (i32.const 2 (; STREAM_READ ;))) + (then unreachable)) + (if (i32.ne (local.get $index) (global.get $insr)) + (then unreachable)) + (if (i32.ne (local.get $payload) (i32.const 0xc0 (; COMPLETED=0 | (12 << 4) ;))) + (then unreachable)) + (if (i32.ne (i32.const 0x89abcdef) (i32.load offset=0 (global.get $inbufp))) + (then unreachable)) + (if (i32.ne (i32.const 0x01234567) (i32.load offset=4 (global.get $inbufp))) + (then unreachable)) + (if (i32.ne (i32.const 0x89abcdef) (i32.load offset=8 (global.get $inbufp))) + (then unreachable)) + + ;; multiple read calls succeed until 12-byte buffer is consumed + (local.set $ret (call $stream.read (global.get $insr) (global.get $inbufp) (i32.const 4))) + (if (i32.ne (i32.const 0x40) (local.get $ret)) + (then unreachable)) + (if (i32.ne (i32.const 0x76543210) (i32.load (global.get $inbufp))) + (then unreachable)) + (local.set $ret (call $stream.read (global.get $insr) (global.get $inbufp) (i32.const 2))) + (if (i32.ne (i32.const 0x20) (local.get $ret)) + (then unreachable)) + (if (i32.ne (i32.const 0xba98) (i32.load16_u (global.get $inbufp))) + (then unreachable)) + (local.set $ret (call $stream.read (global.get $insr) (global.get $inbufp) (i32.const 8))) + (if (i32.ne (i32.const 0x60) (local.get $ret)) + (then unreachable)) + (if (i32.ne (i32.const 0x3210fedc) (i32.load (global.get $inbufp))) + (then unreachable)) + (if (i32.ne (i32.const 0x7654) (i32.load16_u offset=4 (global.get $inbufp))) + (then unreachable)) + + (call $stream.close-readable (global.get $insr)) + (call $stream.close-writable (global.get $outsw)) + (return (i32.const 0 (; EXIT ;))) + ) + ) + (type $ST (stream u8)) + (canon task.return (result $ST) (memory $memory "mem") (core func $task.return)) + (canon waitable.join (core func $waitable.join)) + (canon waitable-set.new (core func $waitable-set.new)) + (canon stream.new $ST (core func $stream.new)) + (canon stream.read $ST async (memory $memory "mem") (core func $stream.read)) + (canon stream.write $ST async (memory $memory "mem") (core func $stream.write)) + (canon stream.close-readable $ST (core func $stream.close-readable)) + (canon stream.close-writable $ST (core func $stream.close-writable)) + (core instance $cm (instantiate $CM (with "" (instance + (export "mem" (memory $memory "mem")) + (export "task.return" (func $task.return)) + (export "waitable.join" (func $waitable.join)) + (export "waitable-set.new" (func $waitable-set.new)) + (export "stream.new" (func $stream.new)) + (export "stream.read" (func $stream.read)) + (export "stream.write" (func $stream.write)) + (export "stream.close-readable" (func $stream.close-readable)) + (export "stream.close-writable" (func $stream.close-writable)) + )))) + (func (export "transform") (param "in" (stream u8)) (result (stream u8)) (canon lift + (core func $cm "transform") + async (memory $memory "mem") (callback (func $cm "transform_cb")) + )) + ) + + (component $D + (import "transform" (func $transform (param "in" (stream u8)) (result (stream u8)))) + + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $DM + (import "" "mem" (memory 1)) + (import "" "waitable.join" (func $waitable.join (param i32 i32))) + (import "" "waitable-set.new" (func $waitable-set.new (result i32))) + (import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32))) + (import "" "stream.new" (func $stream.new (result i64))) + (import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32))) + (import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32))) + (import "" "stream.close-readable" (func $stream.close-readable (param i32))) + (import "" "stream.close-writable" (func $stream.close-writable (param i32))) + (import "" "transform" (func $transform (param i32 i32) (result i32))) + + (func $run (export "run") (result i32) + (local $ret i32) (local $ret64 i64) (local $paramp i32) (local $retp i32) + (local $insr i32) (local $insw i32) (local $outsr i32) + (local $subtask i32) (local $event_code i32) (local $index i32) (local $payload i32) + (local $ws i32) + + ;; create a new stream r/w pair $insr/$insw + (local.set $ret64 (call $stream.new)) + (local.set $insr (i32.wrap_i64 (local.get $ret64))) + (if (i32.ne (i32.const 1) (local.get $insr)) + (then unreachable)) + (local.set $insw (i32.wrap_i64 (i64.shr_u (local.get $ret64) (i64.const 32)))) + (if (i32.ne (i32.const 2) (local.get $insw)) + (then unreachable)) + + ;; call 'transform' which will return a readable stream $outsr eagerly + (local.set $paramp (i32.const 4)) + (local.set $retp (i32.const 8)) + (i32.store (local.get $paramp) (local.get $insr)) + (local.set $ret (call $transform (local.get $paramp) (local.get $retp))) + (if (i32.ne (i32.const 2 (; RETURNED=2 | (0<<4) ;)) (local.get $ret)) + (then unreachable)) + (local.set $outsr (i32.load (local.get $retp))) + (if (i32.ne (i32.const 1) (local.get $outsr)) + (then unreachable)) + + ;; multiple write calls succeed until 12-byte buffer is filled + (i64.store (i32.const 16) (i64.const 0x0123456789abcdef)) + (local.set $ret (call $stream.write (local.get $insw) (i32.const 16) (i32.const 8))) + (if (i32.ne (i32.const 0x80) (local.get $ret)) + (then unreachable)) + (local.set $ret (call $stream.write (local.get $insw) (i32.const 16) (i32.const 8))) + (if (i32.ne (i32.const 0x40) (local.get $ret)) + (then unreachable)) + + ;; start a blocking write with a 12-byte buffer + (i64.store (i32.const 16) (i64.const 0xfedcba9876543210)) + (i32.store (i32.const 24) (i32.const 0x76543210)) + (local.set $ret (call $stream.write (local.get $insw) (i32.const 16) (i32.const 12))) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + + ;; wait for transform to read our write and close all the streams + (local.set $ws (call $waitable-set.new)) + (call $waitable.join (local.get $insw) (local.get $ws)) + (local.set $event_code (call $waitable-set.wait (local.get $ws) (i32.const 0))) + (local.set $index (i32.load (i32.const 0))) + (local.set $payload (i32.load (i32.const 4))) + + ;; confirm the write and the closed stream + (if (i32.ne (local.get $event_code) (i32.const 3 (; STREAM_WRITE ;))) + (then unreachable)) + (if (i32.ne (local.get $index) (local.get $insw)) + (then unreachable)) + (if (i32.ne (local.get $payload) (i32.const 0xc1 (; CLOSED=1 | (12 << 4) ;) (; TODO: currently returns 0xc0 ;))) + (then unreachable)) + + (call $stream.close-writable (local.get $insw)) + (call $stream.close-readable (local.get $outsr)) + + ;; return 42 to the top-level test harness + (i32.const 42) + ) + ) + (type $ST (stream u8)) + (canon waitable.join (core func $waitable.join)) + (canon waitable-set.new (core func $waitable-set.new)) + (canon waitable-set.wait (memory $memory "mem") (core func $waitable-set.wait)) + (canon stream.new $ST (core func $stream.new)) + (canon stream.read $ST async (memory $memory "mem") (core func $stream.read)) + (canon stream.write $ST async (memory $memory "mem") (core func $stream.write)) + (canon stream.close-readable $ST (core func $stream.close-readable)) + (canon stream.close-writable $ST (core func $stream.close-writable)) + (canon lower (func $transform) async (memory $memory "mem") (core func $transform')) + (core instance $dm (instantiate $DM (with "" (instance + (export "mem" (memory $memory "mem")) + (export "waitable.join" (func $waitable.join)) + (export "waitable-set.new" (func $waitable-set.new)) + (export "waitable-set.wait" (func $waitable-set.wait)) + (export "stream.new" (func $stream.new)) + (export "stream.read" (func $stream.read)) + (export "stream.write" (func $stream.write)) + (export "stream.close-readable" (func $stream.close-readable)) + (export "stream.close-writable" (func $stream.close-writable)) + (export "transform" (func $transform')) + )))) + (func (export "run") (result u32) (canon lift (core func $dm "run"))) + ) + + (instance $c (instantiate $C)) + (instance $d (instantiate $D (with "transform" (func $c "transform")))) + (func (export "run") (alias export $d "run")) +) +(assert_return (invoke "run") (u32.const 42)) diff --git a/test/async/passing-resources.wast b/test/async/passing-resources.wast new file mode 100644 index 00000000..39741264 --- /dev/null +++ b/test/async/passing-resources.wast @@ -0,0 +1,176 @@ +;; This test contains two components, $Producer and $Consumer. +;; $Producer.run drives the test and calls $Producer.start-stream to create +;; a stream and attempt to write 2 owned handles. $Producer.run then reads +;; just 1 element. The test finishes by confirming that $Consumer owns the +;; first resource, $Producer (still) owns the second resource, and $Producer +;; traps if it attempts to access the index of the first resource. +(component + (component $Producer + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $Core + (import "" "mem" (memory 1)) + (import "" "resource.new" (func $resource.new (param i32) (result i32))) + (import "" "resource.rep" (func $resource.rep (param i32) (result i32))) + (import "" "stream.new" (func $stream.new (result i64))) + (import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32))) + (import "" "stream.cancel-write" (func $stream.cancel-write (param i32) (result i32))) + (import "" "stream.close-writable" (func $stream.close-writable (param i32))) + + (global $ws (mut i32) (i32.const 0)) + (global $res1 (mut i32) (i32.const 0)) + (global $res2 (mut i32) (i32.const 0)) + + (func $start-stream (export "start-stream") (result i32) + (local $ret i32) (local $ret64 i64) + (local $rs i32) + + ;; create a new stream, return the readable end to the caller + (local.set $ret64 (call $stream.new)) + (global.set $ws (i32.wrap_i64 (i64.shr_u (local.get $ret64) (i64.const 32)))) + (local.set $rs (i32.wrap_i64 (local.get $ret64))) + + ;; create two resources and write them into a buffer to pass to stream.write + (global.set $res1 (call $resource.new (i32.const 50))) + (global.set $res2 (call $resource.new (i32.const 51))) + (i32.store (i32.const 8) (global.get $res1)) + (i32.store (i32.const 12) (global.get $res2)) + + ;; start a write which will block + (local.set $ret (call $stream.write (global.get $ws) (i32.const 8) (i32.const 2))) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + + ;; check that this instance still owns both resources (ownership has not + ;; yet been transferred). + (if (i32.ne (i32.const 50) (call $resource.rep (global.get $res1))) + (then unreachable)) + (if (i32.ne (i32.const 51) (call $resource.rep (global.get $res2))) + (then unreachable)) + + (local.get $rs) + ) + (func $cancel-write (export "cancel-write") + (local $ret i32) + + ;; cancel the write, confirming that the first element was transferred + (local.set $ret (call $stream.cancel-write (global.get $ws))) + (if (i32.ne (i32.const 0x11 (; CLOSED=1 | (1 << 4) ;)) (local.get $ret)) + (then unreachable)) + + ;; we still own $res2 + (if (i32.ne (i32.const 51) (call $resource.rep (global.get $res2))) + (then unreachable)) + + (call $stream.close-writable (global.get $ws)) + ) + (func $R.foo (export "R.foo") (param $rep i32) (result i32) + (i32.add (local.get $rep) (i32.const 50)) + ) + (func $fail-accessing-res1 (export "fail-accessing-res1") + ;; boom + (call $resource.rep (global.get $res1)) + unreachable + ) + ) + (type $R (resource (rep i32))) + (type $ST (stream (own $R))) + (canon resource.new $R (core func $resource.new)) + (canon resource.rep $R (core func $resource.rep)) + (canon stream.new $ST (core func $stream.new)) + (canon stream.write $ST async (memory $memory "mem") (core func $stream.write)) + (canon stream.cancel-write $ST (core func $stream.cancel-write)) + (canon stream.close-writable $ST (core func $stream.close-writable)) + (core instance $core (instantiate $Core (with "" (instance + (export "mem" (memory $memory "mem")) + (export "resource.new" (func $resource.new)) + (export "resource.rep" (func $resource.rep)) + (export "stream.new" (func $stream.new)) + (export "stream.write" (func $stream.write)) + (export "stream.cancel-write" (func $stream.cancel-write)) + (export "stream.close-writable" (func $stream.close-writable)) + )))) + (export $R' "R" (type $R)) + (func (export "[method]R.foo") (param "self" (borrow $R')) (result u32) (canon lift (core func $core "R.foo"))) + (func (export "start-stream") (result (stream (own $R'))) (canon lift (core func $core "start-stream"))) + (func (export "cancel-write") (canon lift (core func $core "cancel-write"))) + (func (export "fail-accessing-res1") (canon lift (core func $core "fail-accessing-res1"))) + ) + + (component $Consumer + (import "producer" (instance $producer + (export $R "R" (type (sub resource))) + (export "[method]R.foo" (func (param "self" (borrow $R)) (result u32))) + (export "start-stream" (func (result (stream (own $R))))) + (export "cancel-write" (func)) + )) + + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $Core + (import "" "mem" (memory 1)) + (import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32))) + (import "" "stream.close-readable" (func $stream.close-readable (param i32))) + (import "" "R.foo" (func $R.foo (param i32) (result i32))) + (import "" "start-stream" (func $start-stream (result i32))) + (import "" "cancel-write" (func $cancel-write)) + + (func $run (export "run") (result i32) + (local $ret i32) (local $rs i32) + (local $res1 i32) + + ;; get the readable end of a stream which has a pending write + (local.set $rs (call $start-stream)) + (if (i32.ne (local.get $rs) (i32.const 1)) + (then unreachable)) + + ;; read only 1 (of the 2 pending) elements, which won't block + (i64.store (i32.const 8) (i64.const 0xdeadbeefdeadbeef)) + (local.set $ret (call $stream.read (local.get $rs) (i32.const 8) (i32.const 1))) + (if (i32.ne (i32.const 0x10) (local.get $ret)) + (then unreachable)) + + ;; only 1 handle should have been transferred + (local.set $res1 (i32.load (i32.const 8))) + (if (i32.ne (i32.load (i32.const 12)) (i32.const 0xdeadbeef)) + (then unreachable)) + + ;; check that we got the first resource and it works + (local.set $ret (call $R.foo (local.get $res1))) + (if (i32.ne (i32.const 100) (local.get $ret)) + (then unreachable)) + + ;; close the stream and then let $C run and assert stuff + (call $stream.close-readable (local.get $rs)) + (call $cancel-write) + + (i32.const 42) + ) + ) + (alias export $producer "R" (type $R)) + (type $ST (stream (own $R))) + (canon stream.read $ST async (memory $memory "mem") (core func $stream.read)) + (canon stream.close-readable $ST (core func $stream.close-readable)) + (canon lower (func $producer "[method]R.foo") (core func $R.foo')) + (canon lower (func $producer "start-stream") (core func $start-stream')) + (canon lower (func $producer "cancel-write") (core func $cancel-write')) + (core instance $core (instantiate $Core (with "" (instance + (export "mem" (memory $memory "mem")) + (export "stream.read" (func $stream.read)) + (export "stream.close-readable" (func $stream.close-readable)) + (export "R.foo" (func $R.foo')) + (export "start-stream" (func $start-stream')) + (export "cancel-write" (func $cancel-write')) + )))) + (func (export "run") (result u32) (canon lift + (core func $core "run") + )) + ) + + (instance $producer (instantiate $Producer)) + (instance $consumer (instantiate $Consumer (with "producer" (instance $producer)))) + (func (export "run") (alias export $consumer "run")) + (func (export "fail-accessing-res1") (alias export $producer "fail-accessing-res1")) +) +(assert_return (invoke "run") (u32.const 42)) +(assert_trap (invoke "fail-accessing-res1") "unknown handle index 1") diff --git a/test/async/trap-on-reenter.wast b/test/async/trap-on-reenter.wast new file mode 100644 index 00000000..90df6405 --- /dev/null +++ b/test/async/trap-on-reenter.wast @@ -0,0 +1,65 @@ +;; This test creates an asynchronous recursive call stack: +;; $Parent --> $Child --> $Parent +;; That should trap when $Child tries to call $Parent. +(component $Parent + (core module $CoreInner + (memory (export "mem") 1) + (func (export "a") (result i32) + unreachable + ) + (func (export "a-cb") (param i32 i32 i32) (result i32) + unreachable + ) + ) + (core instance $core_inner (instantiate $CoreInner)) + (func $a (canon lift + (core func $core_inner "a") + async (callback (func $core_inner "a-cb")) + )) + + (component $Child + (import "a" (func $a)) + + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + + (core module $CoreChild + (import "" "a" (func $a (param i32 i32) (result i32))) + (func (export "b") (result i32) + (i32.const 1 (; YIELD ;)) + ) + (func (export "b-cb") (param i32 i32 i32) (result i32) + (call $a (i32.const 0xdeadbeef) (i32.const 0xdeadbeef)) + unreachable + ) + ) + (canon lower (func $a) async (memory $memory "mem") (core func $a')) + (core instance $core_child (instantiate $CoreChild (with "" (instance + (export "a" (func $a')) + )))) + (func (export "b") (canon lift + (core func $core_child "b") + async (callback (func $core_child "b-cb")) + )) + ) + (instance $child (instantiate $Child (with "a" (func $a)))) + + (core module $CoreOuter + (import "" "b" (func $b (param i32 i32) (result i32))) + (func $c (export "c") (result i32) + (i32.const 1 (; YIELD ;)) + ) + (func $c-cb (export "c-cb") (param i32 i32 i32) (result i32) + (call $b (i32.const 0xdeadbeef) (i32.const 0xdeadbeef)) + ) + ) + (canon lower (func $child "b") async (memory $core_inner "mem") (core func $b)) + (core instance $core_outer (instantiate $CoreOuter (with "" (instance + (export "b" (func $b)) + )))) + (func $c (export "c") (canon lift + (core func $core_outer "c") + async (callback (func $core_outer "c-cb")) + )) +) +(assert_trap (invoke "c") "wasm trap: cannot enter component instance") diff --git a/test/async/zero-length.wast b/test/async/zero-length.wast new file mode 100644 index 00000000..f46710e6 --- /dev/null +++ b/test/async/zero-length.wast @@ -0,0 +1,223 @@ +;; This example defines 3 nested components $Producer, $Consumer and $Parent +;; $Parent imports $Consumer and $Producer, calling $Producer.produce, which +;; returns a stream that $Parent passes to $Consumer.consume. +;; $Producer and $Consumer both start by performing 0-length reads/writes to +;; detect when the other side is ready. Once signalled ready, a 4-byte +;; payload is written/read. +(component + (component $Producer + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $CoreProducer + (import "" "mem" (memory 1)) + (import "" "task.return" (func $task.return (param i32))) + (import "" "waitable.join" (func $waitable.join (param i32 i32))) + (import "" "waitable-set.new" (func $waitable-set.new (result i32))) + (import "" "stream.new" (func $stream.new (result i64))) + (import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32))) + (import "" "stream.close-writable" (func $stream.close-writable (param i32))) + + ;; $ws is waited on by 'produce' + (global $ws (mut i32) (i32.const 0)) + (func $start (global.set $ws (call $waitable-set.new))) + (start $start) + + ;; $outsw is written by 'produce' + (global $outsw (mut i32) (i32.const 0)) + (global $outbufp (mut i32) (i32.const 0x20)) + + (global $state (mut i32) (i32.const 0)) + + (func $produce (export "produce") (result i32) + (local $ret i32) (local $ret64 i64) (local $outsr i32) + + ;; create a new stream r/w pair $outsr/$outsw + (local.set $ret64 (call $stream.new)) + (local.set $outsr (i32.wrap_i64 (local.get $ret64))) + (global.set $outsw (i32.wrap_i64 (i64.shr_u (local.get $ret64) (i64.const 32)))) + + ;; return the readable end of the stream to the caller + (call $task.return (local.get $outsr)) + + ;; initiate a zero-length write + (local.set $ret (call $stream.write (global.get $outsw) (i32.const 0xdeadbeef) (i32.const 0))) + (if (i32.ne (i32.const -1) (local.get $ret)) + (then unreachable)) + + ;; wait for the stream.write to complete + (call $waitable.join (global.get $outsw) (global.get $ws)) + (i32.or (i32.const 2 (; WAIT ;)) (i32.shl (global.get $ws) (i32.const 4))) + ) + (func $produce_cb (export "produce_cb") (param $event_code i32) (param $index i32) (param $payload i32) (result i32) + (local $ret i32) + + ;; confirm we're getting a write for the stream $outsw + (if (i32.ne (local.get $event_code) (i32.const 3 (; STREAM_WRITE ;))) + (then unreachable)) + (if (i32.ne (local.get $index) (global.get $outsw)) + (then unreachable)) + + ;; the first call to produce_cb: + (if (i32.eq (global.get $state) (i32.const 0)) (then + ;; confirm we're seeing the zero-length write complete + (if (i32.ne (local.get $payload) (i32.const 0 (; COMPLETED=0 | (0 << 4) ;))) + (then unreachable)) + + ;; issue an async non-zero-length write which should block per spec + (i32.store (i32.const 0) (i32.const 0x12345678)) + (local.set $ret (call $stream.write (global.get $outsw) (i32.const 0) (i32.const 4))) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + + (global.set $state (i32.const 1)) + + ;; wait on $ws + (return (i32.or (i32.const 2 (; WAIT ;)) (i32.shl (global.get $ws) (i32.const 4)))) + )) + + ;; the second call to produce_cb: + (if (i32.eq (global.get $state) (i32.const 1)) (then + ;; confirm we're seeing the non-zero-length write complete + (if (i32.ne (local.get $payload) (i32.const 0x40 (; COMPLETED=0 | (4 << 4) ;))) + (then unreachable)) + + (call $stream.close-writable (global.get $outsw)) + (return (i32.const 0 (; EXIT ;))) + )) + + unreachable + ) + ) + (type $ST (stream u8)) + (canon task.return (result $ST) (core func $task.return)) + (canon waitable.join (core func $waitable.join)) + (canon waitable-set.new (core func $waitable-set.new)) + (canon stream.new $ST (core func $stream.new)) + (canon stream.write $ST async (memory $memory "mem") (core func $stream.write)) + (canon stream.close-writable $ST (core func $stream.close-writable)) + (core instance $core_producer (instantiate $CoreProducer (with "" (instance + (export "mem" (memory $memory "mem")) + (export "task.return" (func $task.return)) + (export "waitable.join" (func $waitable.join)) + (export "waitable-set.new" (func $waitable-set.new)) + (export "stream.new" (func $stream.new)) + (export "stream.write" (func $stream.write)) + (export "stream.close-writable" (func $stream.close-writable)) + )))) + (func (export "produce") (result (stream u8)) (canon lift + (core func $core_producer "produce") + async (callback (func $core_producer "produce_cb")) + )) + ) + + (component $Consumer + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $CoreConsumer + (import "" "mem" (memory 1)) + (import "" "task.return" (func $task.return (param i32))) + (import "" "waitable.join" (func $waitable.join (param i32 i32))) + (import "" "waitable-set.new" (func $waitable-set.new (result i32))) + (import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32))) + (import "" "stream.close-readable" (func $stream.close-readable (param i32))) + + ;; $ws is waited on by 'consume' + (global $ws (mut i32) (i32.const 0)) + (func $start (global.set $ws (call $waitable-set.new))) + (start $start) + + ;; $insr is read by 'consume' + (global $insr (mut i32) (i32.const 0)) + (global $inbufp (mut i32) (i32.const 0x20)) + + (func $consume (export "consume") (param $insr i32) (result i32) + (local $ret i32) + (global.set $insr (local.get $insr)) + + ;; initiate a zero-length read which will also block (even though there is + ;; a pending write, b/c the pending write is 0-length, per spec) + (local.set $ret (call $stream.read (global.get $insr) (i32.const 0xdeadbeef) (i32.const 0))) + (if (i32.ne (i32.const -1) (local.get $ret)) + (then unreachable)) + + ;; wait for the stream.read to complete + (call $waitable.join (global.get $insr) (global.get $ws)) + (i32.or (i32.const 2 (; WAIT ;)) (i32.shl (global.get $ws) (i32.const 4))) + ) + (func $consume_cb (export "consume_cb") (param $event_code i32) (param $index i32) (param $payload i32) (result i32) + (local $ret i32) + + ;; confirm we're seeing the zero-length read complete + (if (i32.ne (local.get $event_code) (i32.const 2 (; STREAM_READ ;))) + (then unreachable)) + (if (i32.ne (local.get $index) (global.get $insr)) + (then unreachable)) + (if (i32.ne (local.get $payload) (i32.const 0 (; COMPLETED=0 | (0 << 4) ;))) + (then unreachable)) + + ;; perform a non-zero-length read which should succeed without blocking + (local.set $ret (call $stream.read (global.get $insr) (i32.const 0) (i32.const 100))) + (if (i32.ne (i32.const 0x40 (; (4 << 4) | COMPLETED=0 ;)) (local.get $ret)) + (then unreachable)) + (local.set $ret (i32.load (i32.const 0))) + (if (i32.ne (i32.const 0x12345678) (local.get $ret)) + (then unreachable)) + + (call $stream.close-readable (global.get $insr)) + + ;; return 42 to the top-level assert_return + (call $task.return (i32.const 42)) + (i32.const 0 (; EXIT ;)) + ) + ) + (type $ST (stream u8)) + (canon task.return (result u32) (core func $task.return)) + (canon waitable.join (core func $waitable.join)) + (canon waitable-set.new (core func $waitable-set.new)) + (canon stream.read $ST async (memory $memory "mem") (core func $stream.read)) + (canon stream.close-readable $ST (core func $stream.close-readable)) + (core instance $core_consumer (instantiate $CoreConsumer (with "" (instance + (export "mem" (memory $memory "mem")) + (export "task.return" (func $task.return)) + (export "waitable.join" (func $waitable.join)) + (export "waitable-set.new" (func $waitable-set.new)) + (export "stream.read" (func $stream.read)) + (export "stream.close-readable" (func $stream.close-readable)) + )))) + (func (export "consume") (param "in" (stream u8)) (result u32) (canon lift + (core func $core_consumer "consume") + async (callback (func $core_consumer "consume_cb")) + )) + ) + + (component $Parent + (import "produce" (func $produce (result (stream u8)))) + (import "consume" (func $consume (param "in" (stream u8)) (result u32))) + + (core module $CoreParent + (import "" "produce" (func $produce (result i32))) + (import "" "consume" (func $consume (param i32) (result i32))) + (memory 1) + (func $run (export "run") (result i32) + (call $consume (call $produce)) + ) + ) + + (canon lower (func $produce) (core func $produce')) + (canon lower (func $consume) (core func $consume')) + (core instance $core_parent (instantiate $CoreParent (with "" (instance + (export "produce" (func $produce')) + (export "consume" (func $consume')) + )))) + (func (export "run") (result u32) (canon lift (core func $core_parent "run"))) + ) + + (instance $producer (instantiate $Producer)) + (instance $consumer (instantiate $Consumer)) + (instance $parent (instantiate $Parent + (with "produce" (func $producer "produce")) + (with "consume" (func $consumer "consume")) + )) + (func (export "run") (alias export $parent "run")) +) +(assert_return (invoke "run") (u32.const 42)) diff --git a/test/resources/multiple-resources.wast b/test/resources/multiple-resources.wast new file mode 100644 index 00000000..d76e964e --- /dev/null +++ b/test/resources/multiple-resources.wast @@ -0,0 +1,170 @@ +;; This test has two components $C and $D where $D imports and calls $C +;; $C implements two resource types imported and used by $D +;; $D creates instances of both resource types, uses them and destroys them +(component + (component $C + (core module $Indirect + (table (export "ftbl") 2 funcref) + (type $FT (func (param i32))) + (func (export "R1-dtor") (param i32) + (call_indirect (type $FT) (local.get 0) (i32.const 0)) + ) + (func (export "R2-dtor") (param i32) + (call_indirect (type $FT) (local.get 0) (i32.const 1)) + ) + ) + (core instance $indirect (instantiate $Indirect)) + (type $R1' (resource (rep i32) (dtor (func $indirect "R1-dtor")))) + (type $R2' (resource (rep i32) (dtor (func $indirect "R2-dtor")))) + (export $R1 "R1" (type $R1')) + (export $R2 "R2" (type $R2')) + (canon resource.new $R1' (core func $R1.resource.new)) + (canon resource.new $R2' (core func $R2.resource.new)) + + (core module $CM + (import "" "ftbl" (table 1 funcref)) + (import "" "R1.resource.new" (func $R1.resource.new (param i32) (result i32))) + (import "" "R2.resource.new" (func $R2.resource.new (param i32) (result i32))) + (memory 1) + (global $num-live-R1 (mut i32) (i32.const 0)) + (global $num-live-R2 (mut i32) (i32.const 0)) + + ;; constructors + (func $make-R1 (export "make-R1") (result i32) + (local $h i32) + (global.set $num-live-R1 (i32.add (global.get $num-live-R1) (i32.const 1))) + (call $R1.resource.new (i32.add (i32.const 0x40) (global.get $num-live-R1))) + ) + (func $make-R2 (export "make-R2") (result i32) + (local $h i32) + (global.set $num-live-R2 (i32.add (global.get $num-live-R2) (i32.const 1))) + (call $R2.resource.new (i32.add (i32.const 0x80) (global.get $num-live-R2))) + ) + + ;; accessors + (func $get-rep-R1 (export "get-rep-R1") (param $rep i32) (result i32) + (local.get $rep) + ) + (func $get-rep-R2 (export "get-rep-R2") (param $rep i32) (result i32) + (local.get $rep) + ) + + ;; destructors + (func $R1-dtor (param $rep i32) + (if (i32.or (i32.lt_u (local.get $rep) (i32.const 0x41)) (i32.gt_u (local.get $rep) (i32.const 0x42))) + (then unreachable)) + (if (i32.eqz (global.get $num-live-R1)) + (then unreachable)) + (global.set $num-live-R1 (i32.sub (global.get $num-live-R1) (i32.const 1))) + ) + (func $R2-dtor (param $rep i32) + (if (i32.or (i32.lt_u (local.get $rep) (i32.const 0x81)) (i32.gt_u (local.get $rep) (i32.const 0x82))) + (then unreachable)) + (if (i32.eqz (global.get $num-live-R2)) + (then unreachable)) + (global.set $num-live-R2 (i32.sub (global.get $num-live-R2) (i32.const 1))) + ) + (func $num-live (export "num-live") (result i32) + (i32.add (global.get $num-live-R1) (global.get $num-live-R2)) + ) + (elem (i32.const 0) $R1-dtor $R2-dtor) + ) + (core instance $cm (instantiate $CM (with "" (instance + (export "ftbl" (table $indirect "ftbl")) + (export "R1.resource.new" (func $R1.resource.new)) + (export "R2.resource.new" (func $R2.resource.new)) + )))) + (func $make-R1 (export "make-R1") (result (own $R1)) (canon lift (core func $cm "make-R1"))) + (func $make-R2 (export "make-R2") (result (own $R2)) (canon lift (core func $cm "make-R2"))) + (func $get-rep-R1 (export "get-rep-R1") (param "r" (borrow $R1)) (result u32) (canon lift (core func $cm "get-rep-R1"))) + (func $get-rep-R2 (export "get-rep-R2") (param "r" (borrow $R2)) (result u32) (canon lift (core func $cm "get-rep-R2"))) + (func (export "num-live") (result u32) (canon lift (core func $cm "num-live"))) + ) + + (component $D + (import "c" (instance $c + (export $R1 "R1" (type (; $R1 ;) (sub resource))) + (export $R2 "R2" (type (; $R1 ;) (sub resource))) + (export "make-R1" (func (result (own $R1)))) + (export "make-R2" (func (result (own $R2)))) + (export "get-rep-R1" (func (param "r" (borrow $R1)) (result u32))) + (export "get-rep-R2" (func (param "r" (borrow $R2)) (result u32))) + (export "num-live" (func (result u32))) + )) + (core module $DM + (import "" "R1.resource.drop" (func $R1.resource.drop (param i32))) + (import "" "R2.resource.drop" (func $R2.resource.drop (param i32))) + (import "" "make-R1" (func $make-R1 (result i32))) + (import "" "make-R2" (func $make-R2 (result i32))) + (import "" "get-rep-R1" (func $get-rep-R1 (param i32) (result i32))) + (import "" "get-rep-R2" (func $get-rep-R2 (param i32) (result i32))) + (import "" "num-live" (func $num-live (result i32))) + (memory 1) + + (func $run (export "run") (result i32) + (local $ret i32) + (local $h1 i32) (local $h2 i32) (local $h3 i32) (local $h4 i32) + + ;; create 4 resources + (local.set $h1 (call $make-R1)) + (if (i32.ne (i32.const 1) (local.get $h1)) + (then unreachable)) + (local.set $h2 (call $make-R2)) + (if (i32.ne (i32.const 2) (local.get $h2)) + (then unreachable)) + (local.set $h3 (call $make-R1)) + (if (i32.ne (i32.const 3) (local.get $h3)) + (then unreachable)) + (local.set $h4 (call $make-R2)) + (if (i32.ne (i32.const 4) (local.get $h4)) + (then unreachable)) + (if (i32.ne (i32.const 4) (call $num-live)) + (then unreachable)) + + ;; use and destroy resources + (if (i32.ne (i32.const 0x81) (call $get-rep-R2 (local.get $h2))) + (then unreachable)) + (call $R2.resource.drop (local.get $h2)) + (if (i32.ne (i32.const 0x41) (call $get-rep-R1 (local.get $h1))) + (then unreachable)) + (call $R1.resource.drop (local.get $h1)) + (if (i32.ne (i32.const 0x82) (call $get-rep-R2 (local.get $h4))) + (then unreachable)) + (call $R2.resource.drop (local.get $h4)) + (if (i32.ne (i32.const 0x42) (call $get-rep-R1 (local.get $h3))) + (then unreachable)) + (call $R1.resource.drop (local.get $h3)) + + ;; everything should be destroyed + (if (i32.ne (i32.const 0) (call $num-live)) + (then unreachable)) + + (i32.const 42) + ) + ) + (alias export $c "R1" (type $R1)) + (alias export $c "R2" (type $R2)) + (canon resource.drop $R1 (core func $R1.resource.drop)) + (canon resource.drop $R2 (core func $R2.resource.drop)) + (canon lower (func $c "make-R1") (core func $make-R1')) + (canon lower (func $c "make-R2") (core func $make-R2')) + (canon lower (func $c "get-rep-R1") (core func $get-rep-R1')) + (canon lower (func $c "get-rep-R2") (core func $get-rep-R2')) + (canon lower (func $c "num-live") (core func $num-live')) + (core instance $dm (instantiate $DM (with "" (instance + (export "R1.resource.drop" (func $R1.resource.drop)) + (export "R2.resource.drop" (func $R2.resource.drop)) + (export "make-R1" (func $make-R1')) + (export "make-R2" (func $make-R2')) + (export "get-rep-R1" (func $get-rep-R1')) + (export "get-rep-R2" (func $get-rep-R2')) + (export "num-live" (func $num-live')) + )))) + (func (export "run") (result u32) (canon lift (core func $dm "run"))) + ) + + (instance $c (instantiate $C)) + (instance $d (instantiate $D (with "c" (instance $c)))) + (func (export "run") (alias export $d "run")) +) +(assert_return (invoke "run") (u32.const 42)) diff --git a/test/values/strings.wast b/test/values/strings.wast new file mode 100644 index 00000000..0628effc --- /dev/null +++ b/test/values/strings.wast @@ -0,0 +1,39 @@ +(component + (core module $M + (memory (export "mem") 1) + (func (export "f1") (result i32) + (i32.store (i32.const 0) (i32.const 8)) + (i32.store (i32.const 4) (i32.const 1)) + (i32.store8 (i32.const 8) (i32.const 97)) + (i32.const 0) + ) + (func (export "f2") (result i32) + (i32.store (i32.const 0) (i32.const 8)) + (i32.store (i32.const 4) (i32.const 14)) + (i64.store (i32.const 8) (i64.const 0xb8_ef_ba_98_e2_83_98_e2)) + (i32.store (i32.const 16) (i32.const 0xe3_b6_c3_8f)) + (i32.store16 (i32.const 20) (i32.const 0x84_83)) + (i32.const 0) + ) + ;; TODO: so many cases left to test, everyone feel free to fill in... + ) + (core instance $m (instantiate $M)) + (func (export "f1") (result string) (canon lift (core func $m "f1") (memory $m "mem"))) + (func (export "f2") (result string) (canon lift (core func $m "f2") (memory $m "mem"))) +) +(assert_return (invoke "f1") (str.const "a")) +(assert_return (invoke "f2") (str.const "☃☺️öツ")) + +(component + (core module $M + (memory (export "mem") 1) + (func (export "f") (result i32) + (i32.store (i32.const 0) (i32.const 0xdeadbeef)) + (i32.store (i32.const 4) (i32.const 0)) + (i32.const 0) + ) + ) + (core instance $m (instantiate $M)) + (func (export "f") (result string) (canon lift (core func $m "f") (memory $m "mem"))) +) +(assert_trap (invoke "f") "string pointer/length out of bounds of memory")