summaryrefslogtreecommitdiff
path: root/Postman/Postman-Mail/mailgun/vendor/clue/stream-filter/src/CallbackFilter.php
blob: 710940b6e6f8ec175adbdfe54afeb671dac64e19 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
<?php

namespace Clue\StreamFilter;

use php_user_filter;
use InvalidArgumentException;
use ReflectionFunction;
use Exception;

/**
 *
 * @internal
 * @see append()
 * @see prepend()
 */
class CallbackFilter extends php_user_filter
{
    private $callback;
    private $closed = true;
    private $supportsClose = false;

    public function onCreate()
    {
        $this->closed = false;

        if (!is_callable($this->params)) {
            throw new InvalidArgumentException('No valid callback parameter given to stream_filter_(append|prepend)');
        }
        $this->callback = $this->params;

        // callback supports end event if it accepts invocation without arguments
        $ref = new ReflectionFunction($this->callback);
        $this->supportsClose = ($ref->getNumberOfRequiredParameters() === 0);

        return true;
    }

    public function onClose()
    {
        $this->closed = true;

        // callback supports closing and is not already closed
        if ($this->supportsClose) {
            $this->supportsClose = false;
            // invoke without argument to signal end and discard resulting buffer
            try {
                call_user_func($this->callback);
            } catch (Exception $ignored) {
                // this might be called during engine shutdown, so it's not safe
                // to raise any errors or exceptions here
                // trigger_error('Error closing filter: ' . $ignored->getMessage(), E_USER_WARNING);
            }
        }

        $this->callback = null;
    }

    public function filter($in, $out, &$consumed, $closing)
    {
        // concatenate whole buffer from input brigade
        $data = '';
        while ($bucket = stream_bucket_make_writeable($in)) {
            $consumed += $bucket->datalen;
            $data .= $bucket->data;
        }

        // skip processing callback that already ended
        if ($this->closed) {
            return PSFS_FEED_ME;
        }

        // only invoke filter function if buffer is not empty
        // this may skip flushing a closing filter
        if ($data !== '') {
            try {
                $data = call_user_func($this->callback, $data);
            } catch (Exception $e) {
                // exception should mark filter as closed
                $this->onClose();
                trigger_error('Error invoking filter: ' . $e->getMessage(), E_USER_WARNING);

                return PSFS_ERR_FATAL;
            }
        }

        // mark filter as closed after processing closing chunk
        if ($closing) {
            $this->closed = true;

            // callback supports closing and is not already closed
            if ($this->supportsClose) {
                $this->supportsClose = false;

                // invoke without argument to signal end and append resulting buffer
                try {
                    $data .= call_user_func($this->callback);
                } catch (Exception $e) {
                    trigger_error('Error ending filter: ' . $e->getMessage(), E_USER_WARNING);

                    return PSFS_ERR_FATAL;
                }
            }
        }

        if ($data !== '') {
            // create a new bucket for writing the resulting buffer to the output brigade
            // reusing an existing bucket turned out to be bugged in some environments (ancient PHP versions and HHVM)
            $bucket = @stream_bucket_new($this->stream, $data);

            // legacy PHP versions (PHP < 5.4) do not support passing data from the event signal handler
            // because closing the stream invalidates the stream and its stream bucket brigade before
            // invoking the filter close handler.
            if ($bucket !== false) {
                stream_bucket_append($out, $bucket);
            }
        }

        return PSFS_PASS_ON;
    }
}