ops
This commit is contained in:
Executable
+516
@@ -0,0 +1,516 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
from tornado.httputil import (
|
||||
url_concat, parse_multipart_form_data, HTTPHeaders, format_timestamp,
|
||||
HTTPServerRequest, parse_request_start_line, parse_cookie, qs_to_qsl,
|
||||
HTTPInputError,
|
||||
)
|
||||
from tornado.escape import utf8, native_str
|
||||
from tornado.util import PY3
|
||||
from tornado.log import gen_log
|
||||
from tornado.testing import ExpectLog
|
||||
from tornado.test.util import unittest
|
||||
|
||||
import copy
|
||||
import datetime
|
||||
import logging
|
||||
import pickle
|
||||
import time
|
||||
|
||||
if PY3:
|
||||
import urllib.parse as urllib_parse
|
||||
else:
|
||||
import urlparse as urllib_parse
|
||||
|
||||
|
||||
class TestUrlConcat(unittest.TestCase):
|
||||
def test_url_concat_no_query_params(self):
|
||||
url = url_concat(
|
||||
"https://localhost/path",
|
||||
[('y', 'y'), ('z', 'z')],
|
||||
)
|
||||
self.assertEqual(url, "https://localhost/path?y=y&z=z")
|
||||
|
||||
def test_url_concat_encode_args(self):
|
||||
url = url_concat(
|
||||
"https://localhost/path",
|
||||
[('y', '/y'), ('z', 'z')],
|
||||
)
|
||||
self.assertEqual(url, "https://localhost/path?y=%2Fy&z=z")
|
||||
|
||||
def test_url_concat_trailing_q(self):
|
||||
url = url_concat(
|
||||
"https://localhost/path?",
|
||||
[('y', 'y'), ('z', 'z')],
|
||||
)
|
||||
self.assertEqual(url, "https://localhost/path?y=y&z=z")
|
||||
|
||||
def test_url_concat_q_with_no_trailing_amp(self):
|
||||
url = url_concat(
|
||||
"https://localhost/path?x",
|
||||
[('y', 'y'), ('z', 'z')],
|
||||
)
|
||||
self.assertEqual(url, "https://localhost/path?x=&y=y&z=z")
|
||||
|
||||
def test_url_concat_trailing_amp(self):
|
||||
url = url_concat(
|
||||
"https://localhost/path?x&",
|
||||
[('y', 'y'), ('z', 'z')],
|
||||
)
|
||||
self.assertEqual(url, "https://localhost/path?x=&y=y&z=z")
|
||||
|
||||
def test_url_concat_mult_params(self):
|
||||
url = url_concat(
|
||||
"https://localhost/path?a=1&b=2",
|
||||
[('y', 'y'), ('z', 'z')],
|
||||
)
|
||||
self.assertEqual(url, "https://localhost/path?a=1&b=2&y=y&z=z")
|
||||
|
||||
def test_url_concat_no_params(self):
|
||||
url = url_concat(
|
||||
"https://localhost/path?r=1&t=2",
|
||||
[],
|
||||
)
|
||||
self.assertEqual(url, "https://localhost/path?r=1&t=2")
|
||||
|
||||
def test_url_concat_none_params(self):
|
||||
url = url_concat(
|
||||
"https://localhost/path?r=1&t=2",
|
||||
None,
|
||||
)
|
||||
self.assertEqual(url, "https://localhost/path?r=1&t=2")
|
||||
|
||||
def test_url_concat_with_frag(self):
|
||||
url = url_concat(
|
||||
"https://localhost/path#tab",
|
||||
[('y', 'y')],
|
||||
)
|
||||
self.assertEqual(url, "https://localhost/path?y=y#tab")
|
||||
|
||||
def test_url_concat_multi_same_params(self):
|
||||
url = url_concat(
|
||||
"https://localhost/path",
|
||||
[('y', 'y1'), ('y', 'y2')],
|
||||
)
|
||||
self.assertEqual(url, "https://localhost/path?y=y1&y=y2")
|
||||
|
||||
def test_url_concat_multi_same_query_params(self):
|
||||
url = url_concat(
|
||||
"https://localhost/path?r=1&r=2",
|
||||
[('y', 'y')],
|
||||
)
|
||||
self.assertEqual(url, "https://localhost/path?r=1&r=2&y=y")
|
||||
|
||||
def test_url_concat_dict_params(self):
|
||||
url = url_concat(
|
||||
"https://localhost/path",
|
||||
dict(y='y'),
|
||||
)
|
||||
self.assertEqual(url, "https://localhost/path?y=y")
|
||||
|
||||
|
||||
class QsParseTest(unittest.TestCase):
|
||||
|
||||
def test_parsing(self):
|
||||
qsstring = "a=1&b=2&a=3"
|
||||
qs = urllib_parse.parse_qs(qsstring)
|
||||
qsl = list(qs_to_qsl(qs))
|
||||
self.assertIn(('a', '1'), qsl)
|
||||
self.assertIn(('a', '3'), qsl)
|
||||
self.assertIn(('b', '2'), qsl)
|
||||
|
||||
|
||||
class MultipartFormDataTest(unittest.TestCase):
|
||||
def test_file_upload(self):
|
||||
data = b"""\
|
||||
--1234
|
||||
Content-Disposition: form-data; name="files"; filename="ab.txt"
|
||||
|
||||
Foo
|
||||
--1234--""".replace(b"\n", b"\r\n")
|
||||
args = {}
|
||||
files = {}
|
||||
parse_multipart_form_data(b"1234", data, args, files)
|
||||
file = files["files"][0]
|
||||
self.assertEqual(file["filename"], "ab.txt")
|
||||
self.assertEqual(file["body"], b"Foo")
|
||||
|
||||
def test_unquoted_names(self):
|
||||
# quotes are optional unless special characters are present
|
||||
data = b"""\
|
||||
--1234
|
||||
Content-Disposition: form-data; name=files; filename=ab.txt
|
||||
|
||||
Foo
|
||||
--1234--""".replace(b"\n", b"\r\n")
|
||||
args = {}
|
||||
files = {}
|
||||
parse_multipart_form_data(b"1234", data, args, files)
|
||||
file = files["files"][0]
|
||||
self.assertEqual(file["filename"], "ab.txt")
|
||||
self.assertEqual(file["body"], b"Foo")
|
||||
|
||||
def test_special_filenames(self):
|
||||
filenames = ['a;b.txt',
|
||||
'a"b.txt',
|
||||
'a";b.txt',
|
||||
'a;"b.txt',
|
||||
'a";";.txt',
|
||||
'a\\"b.txt',
|
||||
'a\\b.txt',
|
||||
]
|
||||
for filename in filenames:
|
||||
logging.debug("trying filename %r", filename)
|
||||
data = """\
|
||||
--1234
|
||||
Content-Disposition: form-data; name="files"; filename="%s"
|
||||
|
||||
Foo
|
||||
--1234--""" % filename.replace('\\', '\\\\').replace('"', '\\"')
|
||||
data = utf8(data.replace("\n", "\r\n"))
|
||||
args = {}
|
||||
files = {}
|
||||
parse_multipart_form_data(b"1234", data, args, files)
|
||||
file = files["files"][0]
|
||||
self.assertEqual(file["filename"], filename)
|
||||
self.assertEqual(file["body"], b"Foo")
|
||||
|
||||
def test_non_ascii_filename(self):
|
||||
data = b"""\
|
||||
--1234
|
||||
Content-Disposition: form-data; name="files"; filename="ab.txt"; filename*=UTF-8''%C3%A1b.txt
|
||||
|
||||
Foo
|
||||
--1234--""".replace(b"\n", b"\r\n")
|
||||
args = {}
|
||||
files = {}
|
||||
parse_multipart_form_data(b"1234", data, args, files)
|
||||
file = files["files"][0]
|
||||
self.assertEqual(file["filename"], u"áb.txt")
|
||||
self.assertEqual(file["body"], b"Foo")
|
||||
|
||||
def test_boundary_starts_and_ends_with_quotes(self):
|
||||
data = b'''\
|
||||
--1234
|
||||
Content-Disposition: form-data; name="files"; filename="ab.txt"
|
||||
|
||||
Foo
|
||||
--1234--'''.replace(b"\n", b"\r\n")
|
||||
args = {}
|
||||
files = {}
|
||||
parse_multipart_form_data(b'"1234"', data, args, files)
|
||||
file = files["files"][0]
|
||||
self.assertEqual(file["filename"], "ab.txt")
|
||||
self.assertEqual(file["body"], b"Foo")
|
||||
|
||||
def test_missing_headers(self):
|
||||
data = b'''\
|
||||
--1234
|
||||
|
||||
Foo
|
||||
--1234--'''.replace(b"\n", b"\r\n")
|
||||
args = {}
|
||||
files = {}
|
||||
with ExpectLog(gen_log, "multipart/form-data missing headers"):
|
||||
parse_multipart_form_data(b"1234", data, args, files)
|
||||
self.assertEqual(files, {})
|
||||
|
||||
def test_invalid_content_disposition(self):
|
||||
data = b'''\
|
||||
--1234
|
||||
Content-Disposition: invalid; name="files"; filename="ab.txt"
|
||||
|
||||
Foo
|
||||
--1234--'''.replace(b"\n", b"\r\n")
|
||||
args = {}
|
||||
files = {}
|
||||
with ExpectLog(gen_log, "Invalid multipart/form-data"):
|
||||
parse_multipart_form_data(b"1234", data, args, files)
|
||||
self.assertEqual(files, {})
|
||||
|
||||
def test_line_does_not_end_with_correct_line_break(self):
|
||||
data = b'''\
|
||||
--1234
|
||||
Content-Disposition: form-data; name="files"; filename="ab.txt"
|
||||
|
||||
Foo--1234--'''.replace(b"\n", b"\r\n")
|
||||
args = {}
|
||||
files = {}
|
||||
with ExpectLog(gen_log, "Invalid multipart/form-data"):
|
||||
parse_multipart_form_data(b"1234", data, args, files)
|
||||
self.assertEqual(files, {})
|
||||
|
||||
def test_content_disposition_header_without_name_parameter(self):
|
||||
data = b"""\
|
||||
--1234
|
||||
Content-Disposition: form-data; filename="ab.txt"
|
||||
|
||||
Foo
|
||||
--1234--""".replace(b"\n", b"\r\n")
|
||||
args = {}
|
||||
files = {}
|
||||
with ExpectLog(gen_log, "multipart/form-data value missing name"):
|
||||
parse_multipart_form_data(b"1234", data, args, files)
|
||||
self.assertEqual(files, {})
|
||||
|
||||
def test_data_after_final_boundary(self):
|
||||
# The spec requires that data after the final boundary be ignored.
|
||||
# http://www.w3.org/Protocols/rfc1341/7_2_Multipart.html
|
||||
# In practice, some libraries include an extra CRLF after the boundary.
|
||||
data = b"""\
|
||||
--1234
|
||||
Content-Disposition: form-data; name="files"; filename="ab.txt"
|
||||
|
||||
Foo
|
||||
--1234--
|
||||
""".replace(b"\n", b"\r\n")
|
||||
args = {}
|
||||
files = {}
|
||||
parse_multipart_form_data(b"1234", data, args, files)
|
||||
file = files["files"][0]
|
||||
self.assertEqual(file["filename"], "ab.txt")
|
||||
self.assertEqual(file["body"], b"Foo")
|
||||
|
||||
|
||||
class HTTPHeadersTest(unittest.TestCase):
|
||||
def test_multi_line(self):
|
||||
# Lines beginning with whitespace are appended to the previous line
|
||||
# with any leading whitespace replaced by a single space.
|
||||
# Note that while multi-line headers are a part of the HTTP spec,
|
||||
# their use is strongly discouraged.
|
||||
data = """\
|
||||
Foo: bar
|
||||
baz
|
||||
Asdf: qwer
|
||||
\tzxcv
|
||||
Foo: even
|
||||
more
|
||||
lines
|
||||
""".replace("\n", "\r\n")
|
||||
headers = HTTPHeaders.parse(data)
|
||||
self.assertEqual(headers["asdf"], "qwer zxcv")
|
||||
self.assertEqual(headers.get_list("asdf"), ["qwer zxcv"])
|
||||
self.assertEqual(headers["Foo"], "bar baz,even more lines")
|
||||
self.assertEqual(headers.get_list("foo"), ["bar baz", "even more lines"])
|
||||
self.assertEqual(sorted(list(headers.get_all())),
|
||||
[("Asdf", "qwer zxcv"),
|
||||
("Foo", "bar baz"),
|
||||
("Foo", "even more lines")])
|
||||
|
||||
def test_malformed_continuation(self):
|
||||
# If the first line starts with whitespace, it's a
|
||||
# continuation line with nothing to continue, so reject it
|
||||
# (with a proper error).
|
||||
data = " Foo: bar"
|
||||
self.assertRaises(HTTPInputError, HTTPHeaders.parse, data)
|
||||
|
||||
def test_unicode_newlines(self):
|
||||
# Ensure that only \r\n is recognized as a header separator, and not
|
||||
# the other newline-like unicode characters.
|
||||
# Characters that are likely to be problematic can be found in
|
||||
# http://unicode.org/standard/reports/tr13/tr13-5.html
|
||||
# and cpython's unicodeobject.c (which defines the implementation
|
||||
# of unicode_type.splitlines(), and uses a different list than TR13).
|
||||
newlines = [
|
||||
u'\u001b', # VERTICAL TAB
|
||||
u'\u001c', # FILE SEPARATOR
|
||||
u'\u001d', # GROUP SEPARATOR
|
||||
u'\u001e', # RECORD SEPARATOR
|
||||
u'\u0085', # NEXT LINE
|
||||
u'\u2028', # LINE SEPARATOR
|
||||
u'\u2029', # PARAGRAPH SEPARATOR
|
||||
]
|
||||
for newline in newlines:
|
||||
# Try the utf8 and latin1 representations of each newline
|
||||
for encoding in ['utf8', 'latin1']:
|
||||
try:
|
||||
try:
|
||||
encoded = newline.encode(encoding)
|
||||
except UnicodeEncodeError:
|
||||
# Some chars cannot be represented in latin1
|
||||
continue
|
||||
data = b'Cookie: foo=' + encoded + b'bar'
|
||||
# parse() wants a native_str, so decode through latin1
|
||||
# in the same way the real parser does.
|
||||
headers = HTTPHeaders.parse(
|
||||
native_str(data.decode('latin1')))
|
||||
expected = [('Cookie', 'foo=' +
|
||||
native_str(encoded.decode('latin1')) + 'bar')]
|
||||
self.assertEqual(
|
||||
expected, list(headers.get_all()))
|
||||
except Exception:
|
||||
gen_log.warning("failed while trying %r in %s",
|
||||
newline, encoding)
|
||||
raise
|
||||
|
||||
def test_optional_cr(self):
|
||||
# Both CRLF and LF should be accepted as separators. CR should not be
|
||||
# part of the data when followed by LF, but it is a normal char
|
||||
# otherwise (or should bare CR be an error?)
|
||||
headers = HTTPHeaders.parse(
|
||||
'CRLF: crlf\r\nLF: lf\nCR: cr\rMore: more\r\n')
|
||||
self.assertEqual(sorted(headers.get_all()),
|
||||
[('Cr', 'cr\rMore: more'),
|
||||
('Crlf', 'crlf'),
|
||||
('Lf', 'lf'),
|
||||
])
|
||||
|
||||
def test_copy(self):
|
||||
all_pairs = [('A', '1'), ('A', '2'), ('B', 'c')]
|
||||
h1 = HTTPHeaders()
|
||||
for k, v in all_pairs:
|
||||
h1.add(k, v)
|
||||
h2 = h1.copy()
|
||||
h3 = copy.copy(h1)
|
||||
h4 = copy.deepcopy(h1)
|
||||
for headers in [h1, h2, h3, h4]:
|
||||
# All the copies are identical, no matter how they were
|
||||
# constructed.
|
||||
self.assertEqual(list(sorted(headers.get_all())), all_pairs)
|
||||
for headers in [h2, h3, h4]:
|
||||
# Neither the dict or its member lists are reused.
|
||||
self.assertIsNot(headers, h1)
|
||||
self.assertIsNot(headers.get_list('A'), h1.get_list('A'))
|
||||
|
||||
def test_pickle_roundtrip(self):
|
||||
headers = HTTPHeaders()
|
||||
headers.add('Set-Cookie', 'a=b')
|
||||
headers.add('Set-Cookie', 'c=d')
|
||||
headers.add('Content-Type', 'text/html')
|
||||
pickled = pickle.dumps(headers)
|
||||
unpickled = pickle.loads(pickled)
|
||||
self.assertEqual(sorted(headers.get_all()), sorted(unpickled.get_all()))
|
||||
self.assertEqual(sorted(headers.items()), sorted(unpickled.items()))
|
||||
|
||||
def test_setdefault(self):
|
||||
headers = HTTPHeaders()
|
||||
headers['foo'] = 'bar'
|
||||
# If a value is present, setdefault returns it without changes.
|
||||
self.assertEqual(headers.setdefault('foo', 'baz'), 'bar')
|
||||
self.assertEqual(headers['foo'], 'bar')
|
||||
# If a value is not present, setdefault sets it for future use.
|
||||
self.assertEqual(headers.setdefault('quux', 'xyzzy'), 'xyzzy')
|
||||
self.assertEqual(headers['quux'], 'xyzzy')
|
||||
self.assertEqual(sorted(headers.get_all()), [('Foo', 'bar'), ('Quux', 'xyzzy')])
|
||||
|
||||
def test_string(self):
|
||||
headers = HTTPHeaders()
|
||||
headers.add("Foo", "1")
|
||||
headers.add("Foo", "2")
|
||||
headers.add("Foo", "3")
|
||||
headers2 = HTTPHeaders.parse(str(headers))
|
||||
self.assertEquals(headers, headers2)
|
||||
|
||||
|
||||
class FormatTimestampTest(unittest.TestCase):
|
||||
# Make sure that all the input types are supported.
|
||||
TIMESTAMP = 1359312200.503611
|
||||
EXPECTED = 'Sun, 27 Jan 2013 18:43:20 GMT'
|
||||
|
||||
def check(self, value):
|
||||
self.assertEqual(format_timestamp(value), self.EXPECTED)
|
||||
|
||||
def test_unix_time_float(self):
|
||||
self.check(self.TIMESTAMP)
|
||||
|
||||
def test_unix_time_int(self):
|
||||
self.check(int(self.TIMESTAMP))
|
||||
|
||||
def test_struct_time(self):
|
||||
self.check(time.gmtime(self.TIMESTAMP))
|
||||
|
||||
def test_time_tuple(self):
|
||||
tup = tuple(time.gmtime(self.TIMESTAMP))
|
||||
self.assertEqual(9, len(tup))
|
||||
self.check(tup)
|
||||
|
||||
def test_datetime(self):
|
||||
self.check(datetime.datetime.utcfromtimestamp(self.TIMESTAMP))
|
||||
|
||||
|
||||
# HTTPServerRequest is mainly tested incidentally to the server itself,
|
||||
# but this tests the parts of the class that can be tested in isolation.
|
||||
class HTTPServerRequestTest(unittest.TestCase):
|
||||
def test_default_constructor(self):
|
||||
# All parameters are formally optional, but uri is required
|
||||
# (and has been for some time). This test ensures that no
|
||||
# more required parameters slip in.
|
||||
HTTPServerRequest(uri='/')
|
||||
|
||||
def test_body_is_a_byte_string(self):
|
||||
requets = HTTPServerRequest(uri='/')
|
||||
self.assertIsInstance(requets.body, bytes)
|
||||
|
||||
def test_repr_does_not_contain_headers(self):
|
||||
request = HTTPServerRequest(uri='/', headers={'Canary': 'Coal Mine'})
|
||||
self.assertTrue('Canary' not in repr(request))
|
||||
|
||||
|
||||
class ParseRequestStartLineTest(unittest.TestCase):
|
||||
METHOD = "GET"
|
||||
PATH = "/foo"
|
||||
VERSION = "HTTP/1.1"
|
||||
|
||||
def test_parse_request_start_line(self):
|
||||
start_line = " ".join([self.METHOD, self.PATH, self.VERSION])
|
||||
parsed_start_line = parse_request_start_line(start_line)
|
||||
self.assertEqual(parsed_start_line.method, self.METHOD)
|
||||
self.assertEqual(parsed_start_line.path, self.PATH)
|
||||
self.assertEqual(parsed_start_line.version, self.VERSION)
|
||||
|
||||
|
||||
class ParseCookieTest(unittest.TestCase):
|
||||
# These tests copied from Django:
|
||||
# https://github.com/django/django/pull/6277/commits/da810901ada1cae9fc1f018f879f11a7fb467b28
|
||||
def test_python_cookies(self):
|
||||
"""
|
||||
Test cases copied from Python's Lib/test/test_http_cookies.py
|
||||
"""
|
||||
self.assertEqual(parse_cookie('chips=ahoy; vienna=finger'),
|
||||
{'chips': 'ahoy', 'vienna': 'finger'})
|
||||
# Here parse_cookie() differs from Python's cookie parsing in that it
|
||||
# treats all semicolons as delimiters, even within quotes.
|
||||
self.assertEqual(
|
||||
parse_cookie('keebler="E=mc2; L=\\"Loves\\"; fudge=\\012;"'),
|
||||
{'keebler': '"E=mc2', 'L': '\\"Loves\\"', 'fudge': '\\012', '': '"'}
|
||||
)
|
||||
# Illegal cookies that have an '=' char in an unquoted value.
|
||||
self.assertEqual(parse_cookie('keebler=E=mc2'), {'keebler': 'E=mc2'})
|
||||
# Cookies with ':' character in their name.
|
||||
self.assertEqual(parse_cookie('key:term=value:term'), {'key:term': 'value:term'})
|
||||
# Cookies with '[' and ']'.
|
||||
self.assertEqual(parse_cookie('a=b; c=[; d=r; f=h'),
|
||||
{'a': 'b', 'c': '[', 'd': 'r', 'f': 'h'})
|
||||
|
||||
def test_cookie_edgecases(self):
|
||||
# Cookies that RFC6265 allows.
|
||||
self.assertEqual(parse_cookie('a=b; Domain=example.com'),
|
||||
{'a': 'b', 'Domain': 'example.com'})
|
||||
# parse_cookie() has historically kept only the last cookie with the
|
||||
# same name.
|
||||
self.assertEqual(parse_cookie('a=b; h=i; a=c'), {'a': 'c', 'h': 'i'})
|
||||
|
||||
def test_invalid_cookies(self):
|
||||
"""
|
||||
Cookie strings that go against RFC6265 but browsers will send if set
|
||||
via document.cookie.
|
||||
"""
|
||||
# Chunks without an equals sign appear as unnamed values per
|
||||
# https://bugzilla.mozilla.org/show_bug.cgi?id=169091
|
||||
self.assertIn('django_language',
|
||||
parse_cookie('abc=def; unnamed; django_language=en').keys())
|
||||
# Even a double quote may be an unamed value.
|
||||
self.assertEqual(parse_cookie('a=b; "; c=d'), {'a': 'b', '': '"', 'c': 'd'})
|
||||
# Spaces in names and values, and an equals sign in values.
|
||||
self.assertEqual(parse_cookie('a b c=d e = f; gh=i'), {'a b c': 'd e = f', 'gh': 'i'})
|
||||
# More characters the spec forbids.
|
||||
self.assertEqual(parse_cookie('a b,c<>@:/[]?{}=d " =e,f g'),
|
||||
{'a b,c<>@:/[]?{}': 'd " =e,f g'})
|
||||
# Unicode characters. The spec only allows ASCII.
|
||||
self.assertEqual(parse_cookie('saint=André Bessette'),
|
||||
{'saint': native_str('André Bessette')})
|
||||
# Browsers don't send extra whitespace or semicolons in Cookie headers,
|
||||
# but parse_cookie() should parse whitespace the same way
|
||||
# document.cookie parses whitespace.
|
||||
self.assertEqual(parse_cookie(' = b ; ; = ; c = ; '), {'': 'b', 'c': ''})
|
||||
Reference in New Issue
Block a user