1 """Plugin for authenticating directly out of Varnish's VCL."""
10 from acme import challenges
12 from letsencrypt import errors
13 from letsencrypt import interfaces
14 from letsencrypt.plugins import common
17 logger = logging.getLogger(__name__)
19 def vcl_recv_line(achall):
20 # Don't bother checking for the right host, we could be coming in through a redirect.
21 return 'if (req.url == "/%s/%s") { return (synth(999, "Challenge")); } # Added by letsencrypt Varnish plugin for authentication\n' % (achall.URI_ROOT_PATH, achall.chall.encode("token"))
23 def vcl_synth_line(validation):
24 return 'if (resp.status == 999) { set resp.status = 200; set resp.http.Content-Type = "text/plain"; synthetic("%s"); return (deliver); } # Added by letsencrypt Varnish plugin for authentication\n' % (validation);
26 class Authenticator(common.Plugin):
27 zope.interface.implements(interfaces.IAuthenticator)
28 zope.interface.classProvides(interfaces.IPluginFactory)
31 description = "Manual configuration, authentication via Varnish VCL"
33 def __init__(self, *args, **kwargs):
34 super(Authenticator, self).__init__(*args, **kwargs)
37 def prepare(self): # pylint: disable=missing-docstring,no-self-use
38 pass # pragma: no cover
40 def more_info(self): # pylint: disable=missing-docstring,no-self-use
43 def get_chall_pref(self, domain):
44 # pylint: disable=missing-docstring,no-self-use,unused-argument
45 return [challenges.HTTP01]
47 def perform(self, achalls): # pylint: disable=missing-docstring
49 for achall in achalls:
50 responses.append(self._perform_single(achall))
53 def _perform_single(self, achall):
54 # same path for each challenge response would be easier for
55 # users, but will not work if multiple domains point at the
56 # same server: default command doesn't support virtual hosts
57 response, validation = achall.response_and_validation()
59 with open("/etc/varnish/default.vcl") as vcl:
60 content = vcl.readlines()
62 self.old_content = content
64 found_vcl_recv = False
65 found_vcl_synth = False
68 if re.search("# Added by letsencrypt Varnish plugin", line):
69 # Don't include this line; left by a previous run.
71 new_content.append(line)
72 if re.search("^sub\s+vcl_recv\s+{\s*$", line):
73 new_content.append(vcl_recv_line(achall))
75 if re.search("^sub\s+vcl_synth\s+{\s*$", line):
76 new_content.append(vcl_synth_line(validation))
77 found_vcl_synth = True
79 if not found_vcl_recv:
80 new_content.append("sub vcl_recv { # Added by letsencrypt Varnish plugin for authentication\n")
81 new_content.append(vcl_recv_line(achall))
82 new_content.append("} # Added by letsencrypt Varnish plugin for authentication\n")
84 if not found_vcl_synth:
85 new_content.append("sub vcl_synth { # Added by letsencrypt Varnish plugin for authentication\n")
86 new_content.append(vcl_synth_line(validation))
87 new_content.append("} # Added by letsencrypt Varnish plugin for authentication\n")
89 with open("/etc/varnish/default.vcl", "w") as vcl:
90 vcl.writelines(new_content)
92 subprocess.call(["systemctl", "reload", "varnish.service"])
94 if response.simple_verify(
95 achall.chall, achall.domain,
96 achall.account_key.public_key(), self.config.http01_port):
100 "Self-verify of challenge failed, authorization abandoned!")
103 def cleanup(self, achalls):
104 # pylint: disable=missing-docstring,no-self-use,unused-argument
105 with open("/etc/varnish/default.vcl", "w") as vcl:
106 vcl.writelines(self.old_content)
108 subprocess.call(["systemctl", "reload", "varnish.service"])