xref: /ext-fiber/examples/002-read-write.php (revision b5d6a96a)
1<?php
2
3class EventLoop
4{
5    private string $nextId = 'a';
6    private array $deferCallbacks = [];
7    private array $read = [];
8    private array $streamCallbacks = [];
9
10    public function run(): void
11    {
12        while (!empty($this->deferCallbacks) || !empty($this->read)) {
13            $defers = $this->deferCallbacks;
14            $this->deferCallbacks = [];
15            foreach ($defers as $id => $defer) {
16                $defer();
17            }
18
19            $this->select($this->read);
20        }
21    }
22
23    private function select(array $read): void
24    {
25        $timeout = empty($this->deferCallbacks) ? null : 0;
26        if (!stream_select($read, $write, $except, $timeout, $timeout)) {
27            return;
28        }
29
30        foreach ($read as $id => $resource) {
31            $callback = $this->streamCallbacks[$id];
32            unset($this->read[$id], $this->streamCallbacks[$id]);
33            $callback($resource);
34        }
35    }
36    public function defer(callable $callback): void
37    {
38        $id = $this->nextId++;
39        $this->deferCallbacks[$id] = $callback;
40    }
41
42    public function read($resource, callable $callback): void
43    {
44        $id = $this->nextId++;
45        $this->read[$id] = $resource;
46        $this->streamCallbacks[$id] = $callback;
47    }
48}
49
50[$read, $write] = stream_socket_pair(
51    stripos(PHP_OS, 'win') === 0 ? STREAM_PF_INET : STREAM_PF_UNIX,
52    STREAM_SOCK_STREAM,
53    STREAM_IPPROTO_IP
54);
55
56// Set streams to non-blocking mode.
57stream_set_blocking($read, false);
58stream_set_blocking($write, false);
59
60$loop = new EventLoop;
61
62// Read data in a separate fiber after checking if the stream is readable.
63$fiber = new Fiber(function () use ($loop, $read): void {
64    echo "Waiting for data...\n";
65
66    $fiber = Fiber::getCurrent();
67    $loop->read($read, fn() => $fiber->resume());
68    Fiber::suspend();
69
70    $data = fread($read, 8192);
71
72    echo "Received data: ", $data, "\n";
73});
74
75// Start the fiber, which will suspend while waiting for a read event.
76$loop->defer(fn() => $fiber->start());
77
78// Defer writing data to an event loop callback.
79$loop->defer(fn() => fwrite($write, "Hello, world!"));
80
81// Run the event loop.
82$loop->run();