|
3 | 3 | import tempfile |
4 | 4 | import unittest |
5 | 5 |
|
6 | | -from flask import Flask, render_template |
| 6 | +from flask import Flask, make_response, render_template, request |
7 | 7 | from flask_caching import Cache |
8 | 8 |
|
9 | 9 | from flask_compress import Compress, DictCache |
@@ -91,6 +91,10 @@ def test_quality_level_default_zstd(self): |
91 | 91 | """Tests COMPRESS_ZSTD_LEVEL default value is correctly set.""" |
92 | 92 | self.assertEqual(self.app.config["COMPRESS_ZSTD_LEVEL"], 3) |
93 | 93 |
|
| 94 | + def test_mutate_weak_etags(self): |
| 95 | + """Tests COMPRESS_MUTATE_WEAK_ETAGS default value is correctly set.""" |
| 96 | + self.assertEqual(self.app.config["COMPRESS_MUTATE_WEAK_ETAGS"], True) |
| 97 | + |
94 | 98 |
|
95 | 99 | class InitTests(unittest.TestCase): |
96 | 100 | def setUp(self): |
@@ -570,5 +574,93 @@ def test_compression(self): |
570 | 574 | self.assertEqual(self.cache_key_calls, 2) |
571 | 575 |
|
572 | 576 |
|
| 577 | +class ETagTests(unittest.TestCase): |
| 578 | + def setUp(self): |
| 579 | + self.app = Flask(__name__) |
| 580 | + self.app.testing = True |
| 581 | + self.app.config["COMPRESS_ALGORITHM"] = ["gzip"] |
| 582 | + self.app.config["COMPRESS_MIN_SIZE"] = 1 |
| 583 | + |
| 584 | + Compress(self.app) |
| 585 | + |
| 586 | + @self.app.route("/strong/") |
| 587 | + def strong(): |
| 588 | + rv = make_response(render_template("large.html")) |
| 589 | + rv.set_etag("abc123", weak=False) |
| 590 | + return rv.make_conditional(request) |
| 591 | + |
| 592 | + @self.app.route("/weak/") |
| 593 | + def weak(): |
| 594 | + rv = make_response(render_template("large.html")) |
| 595 | + rv.set_etag("abc123", weak=True) |
| 596 | + return rv.make_conditional(request) |
| 597 | + |
| 598 | + def test_strong_etag_is_mutated_with_suffix_and_remains_strong(self): |
| 599 | + client = self.app.test_client() |
| 600 | + r = client.get("/strong/", headers=[("Accept-Encoding", "gzip")]) |
| 601 | + self.assertEqual(r.status_code, 200) |
| 602 | + self.assertEqual(r.headers.get("Content-Encoding"), "gzip") |
| 603 | + |
| 604 | + tag, is_weak = r.get_etag() |
| 605 | + self.assertFalse(is_weak) |
| 606 | + self.assertEqual(tag, "abc123:gzip") |
| 607 | + self.assertEqual(int(r.headers["Content-Length"]), len(r.data)) |
| 608 | + |
| 609 | + def test_weak_etag_is_mutated_when_flag_true(self): |
| 610 | + client = self.app.test_client() |
| 611 | + r = client.get("/weak/", headers=[("Accept-Encoding", "gzip")]) |
| 612 | + self.assertEqual(r.status_code, 200) |
| 613 | + self.assertEqual(r.headers.get("Content-Encoding"), "gzip") |
| 614 | + |
| 615 | + tag, is_weak = r.get_etag() |
| 616 | + self.assertTrue(is_weak) |
| 617 | + self.assertEqual(tag, "abc123:gzip") |
| 618 | + |
| 619 | + def test_strong_etag_is_mutated_when_flag_false(self): |
| 620 | + self.app.config["COMPRESS_MUTATE_WEAK_ETAGS"] = False |
| 621 | + client = self.app.test_client() |
| 622 | + r = client.get("/strong/", headers=[("Accept-Encoding", "gzip")]) |
| 623 | + self.assertEqual(r.status_code, 200) |
| 624 | + self.assertEqual(r.headers.get("Content-Encoding"), "gzip") |
| 625 | + |
| 626 | + tag, is_weak = r.get_etag() |
| 627 | + self.assertFalse(is_weak) |
| 628 | + # :gzip suffix when flag is False |
| 629 | + self.assertEqual(tag, "abc123:gzip") |
| 630 | + |
| 631 | + def test_weak_etag_is_preserved_when_flag_false(self): |
| 632 | + self.app.config["COMPRESS_MUTATE_WEAK_ETAGS"] = False |
| 633 | + client = self.app.test_client() |
| 634 | + r = client.get("/weak/", headers=[("Accept-Encoding", "gzip")]) |
| 635 | + self.assertEqual(r.status_code, 200) |
| 636 | + self.assertEqual(r.headers.get("Content-Encoding"), "gzip") |
| 637 | + |
| 638 | + tag, is_weak = r.get_etag() |
| 639 | + self.assertTrue(is_weak) |
| 640 | + # No :gzip suffix when flag is False |
| 641 | + self.assertEqual(tag, "abc123") |
| 642 | + |
| 643 | + def test_conditional_get_uses_strong_compressed_representation(self): |
| 644 | + client = self.app.test_client() |
| 645 | + r1 = client.get("/strong/", headers=[("Accept-Encoding", "gzip")]) |
| 646 | + |
| 647 | + r2 = client.get( |
| 648 | + "/strong/", |
| 649 | + headers=[("Accept-Encoding", "gzip"), ("If-None-Match", r1.headers["ETag"])], |
| 650 | + ) |
| 651 | + # This is the current behavior that breaks strong etags due rewrite |
| 652 | + self.assertEqual(r2.status_code, 200) |
| 653 | + |
| 654 | + def test_conditional_get_uses_weak_compressed_representation(self): |
| 655 | + self.app.config["COMPRESS_MUTATE_WEAK_ETAGS"] = False |
| 656 | + client = self.app.test_client() |
| 657 | + r1 = client.get("/weak/", headers=[("Accept-Encoding", "gzip")]) |
| 658 | + r2 = client.get( |
| 659 | + "/weak/", |
| 660 | + headers=[("Accept-Encoding", "gzip"), ("If-None-Match", r1.headers["ETag"])], |
| 661 | + ) |
| 662 | + self.assertEqual(r2.status_code, 304) |
| 663 | + |
| 664 | + |
573 | 665 | if __name__ == "__main__": |
574 | 666 | unittest.main() |
0 commit comments