summaryrefslogtreecommitdiff
path: root/src/testdir/test_channel.py
blob: f1d774fd11436e9ab65a1f5b31a269760126a2d7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/python
#
# Server that will accept connections from a Vim channel.
# Run this server and then in Vim you can open the channel:
#  :let handle = ch_open('localhost:8765', 'json')
#
# Then Vim can send requests to the server:
#  :let response = ch_sendexpr(handle, 'hello!')
#
# See ":help channel-demo" in Vim.
#
# This requires Python 2.6 or later.

from __future__ import print_function
import json
import socket
import sys
import threading

try:
    # Python 3
    import socketserver
except ImportError:
    # Python 2
    import SocketServer as socketserver

class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):

    def handle(self):
        print("=== socket opened ===")
        while True:
            try:
                received = self.request.recv(4096).decode('utf-8')
            except socket.error:
                print("=== socket error ===")
                break
            except IOError:
                print("=== socket closed ===")
                break
            if received == '':
                print("=== socket closed ===")
                break
            print("received: {}".format(received))

            # We may receive two messages at once. Take the part up to the
            # matching "]" (recognized by finding "][").
            todo = received
            while todo != '':
                splitidx = todo.find('][')
                if splitidx < 0:
                     used = todo
                     todo = ''
                else:
                     used = todo[:splitidx + 1]
                     todo = todo[splitidx + 1:]
                if used != received:
                    print("using: {}".format(used))

                try:
                    decoded = json.loads(used)
                except ValueError:
                    print("json decoding failed")
                    decoded = [-1, '']

                # Send a response if the sequence number is positive.
                if decoded[0] >= 0:
                    if decoded[1] == 'hello!':
                        # simply send back a string
                        response = "got it"
                    elif decoded[1] == 'make change':
                        # Send two ex commands at the same time, before replying to
                        # the request.
                        cmd = '["ex","call append(\\"$\\",\\"added1\\")"]'
                        cmd += '["ex","call append(\\"$\\",\\"added2\\")"]'
                        print("sending: {}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = "ok"
                    elif decoded[1] == 'eval-works':
                        # Send an eval request.  We ignore the response.
                        cmd = '["eval","\\"foo\\" . 123", -1]'
                        print("sending: {}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = "ok"
                    elif decoded[1] == 'eval-fails':
                        # Send an eval request that will fail.
                        cmd = '["eval","xxx", -2]'
                        print("sending: {}".format(cmd))
                        self.request.sendall(cmd.encode('utf-8'))
                        response = "ok"
                    elif decoded[1] == 'eval-result':
                        # Send back the last received eval result.
                        response = last_eval
                    elif decoded[1] == '!quit!':
                        # we're done
                        sys.exit(0)
                    elif decoded[1] == '!crash!':
                        # Crash!
                        42 / 0
                    else:
                        response = "what?"

                    encoded = json.dumps([decoded[0], response])
                    print("sending: {}".format(encoded))
                    self.request.sendall(encoded.encode('utf-8'))

                # Negative numbers are used for "eval" responses.
                elif decoded[0] < 0:
                    last_eval = decoded

class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
    pass

if __name__ == "__main__":
    HOST, PORT = "localhost", 0

    server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
    ip, port = server.server_address

    # Start a thread with the server -- that thread will then start one
    # more thread for each request
    server_thread = threading.Thread(target=server.serve_forever)

    # Exit the server thread when the main thread terminates
    server_thread.daemon = True
    server_thread.start()

    # Write the port number in Xportnr, so that the test knows it.
    f = open("Xportnr", "w")
    f.write("{}".format(port))
    f.close()

    # Block here
    print("Listening on port {}".format(port))
    server.serve_forever()