]> git.sesse.net Git - letsencrypt-varnish-plugin/blob - varnish.py
Switch import calls to certbot
[letsencrypt-varnish-plugin] / varnish.py
1 """Plugin for authenticating directly out of Varnish's VCL."""
2 import os
3 import logging
4 import re
5 import subprocess
6
7 import zope.component
8 import zope.interface
9 from zope.interface import implementer, provider
10
11 from acme import challenges
12
13 from certbot import errors
14 from certbot import interfaces
15 from certbot.plugins import common
16
17
18 logger = logging.getLogger(__name__)
19
20 def vcl_recv_line(achall):
21     # Don't bother checking for the right host, we could be coming in through a redirect.
22     return 'if (req.url == "/%s/%s") { return (synth(999, "Challenge")); }  # Added by letsencrypt Varnish plugin for authentication\n' % (achall.URI_ROOT_PATH, achall.chall.encode("token"))
23
24 def vcl_synth_line(validation):
25     return 'if (resp.status == 999) { set resp.status = 200; set resp.http.Content-Type = "text/plain"; synthetic("%s"); return (deliver); }  # Added by letsencrypt Varnish plugin for authentication\n' % (validation);
26
27 @implementer(interfaces.IAuthenticator)
28 @provider(interfaces.IPluginFactory)
29 class Authenticator(common.Plugin):
30     hidden = True
31
32     description = "Manual configuration, authentication via Varnish VCL"
33
34     def __init__(self, *args, **kwargs):
35         super(Authenticator, self).__init__(*args, **kwargs)
36         self._httpd = None
37
38     def prepare(self):  # pylint: disable=missing-docstring,no-self-use
39         pass  # pragma: no cover
40
41     def more_info(self):  # pylint: disable=missing-docstring,no-self-use
42         return ("")
43
44     def get_chall_pref(self, domain):
45         # pylint: disable=missing-docstring,no-self-use,unused-argument
46         return [challenges.HTTP01]
47
48     def perform(self, achalls):  # pylint: disable=missing-docstring
49         responses = []
50         for achall in achalls:
51             responses.append(self._perform_single(achall))
52         return responses
53
54     def _perform_single(self, achall):
55         # same path for each challenge response would be easier for
56         # users, but will not work if multiple domains point at the
57         # same server: default command doesn't support virtual hosts
58         response, validation = achall.response_and_validation()
59
60         with open("/etc/varnish/default.vcl") as vcl:
61             content = vcl.readlines()
62
63         self.old_content = content
64
65         found_vcl_recv = False
66         found_vcl_synth = False
67         new_content = []
68         for line in content:
69             if re.search("# Added by letsencrypt Varnish plugin", line):
70                 # Don't include this line; left by a previous run.
71                 continue
72             new_content.append(line)
73             if re.search("^sub\s+vcl_recv\s+{\s*$", line):
74                 new_content.append(vcl_recv_line(achall))
75                 found_vcl_recv = True
76             if re.search("^sub\s+vcl_synth\s+{\s*$", line):
77                 new_content.append(vcl_synth_line(validation))
78                 found_vcl_synth = True
79
80         if not found_vcl_recv:
81             new_content.append("sub vcl_recv {  # Added by letsencrypt Varnish plugin for authentication\n")
82             new_content.append(vcl_recv_line(achall))
83             new_content.append("}  # Added by letsencrypt Varnish plugin for authentication\n")
84
85         if not found_vcl_synth:
86             new_content.append("sub vcl_synth {  # Added by letsencrypt Varnish plugin for authentication\n")
87             new_content.append(vcl_synth_line(validation))
88             new_content.append("}  # Added by letsencrypt Varnish plugin for authentication\n")
89
90         with open("/etc/varnish/default.vcl", "w") as vcl:
91             vcl.writelines(new_content)
92
93         subprocess.call(["systemctl", "reload", "varnish.service"])
94
95         if response.simple_verify(
96                 achall.chall, achall.domain,
97                 achall.account_key.public_key(), self.config.http01_port):
98             return response
99         else:
100             logger.error(
101                 "Self-verify of challenge failed, authorization abandoned!")
102             return None
103
104     def cleanup(self, achalls):
105         # pylint: disable=missing-docstring,no-self-use,unused-argument
106         with open("/etc/varnish/default.vcl", "w") as vcl:
107             vcl.writelines(self.old_content)
108
109         subprocess.call(["systemctl", "reload", "varnish.service"])