1 import os
2 import base64
3 import datetime
4 import functools
5 from functools import wraps, partial
6
7 from netaddr import IPAddress, IPNetwork
8 import re
9 import flask
10 from flask import send_file
11
12 from urllib.parse import urlparse
13 from openid_teams.teams import TeamsRequest
14
15 from copr_common.enums import RoleEnum
16 from coprs import app
17 from coprs import db
18 from coprs import helpers
19 from coprs import models
20 from coprs import oid
21 from coprs.logic.complex_logic import ComplexLogic
22 from coprs.logic.users_logic import UsersLogic
23 from coprs.logic.coprs_logic import CoprsLogic
40
43 oidname_parse = urlparse(oidname)
44 if not oidname_parse.netloc:
45 return oidname
46 config_parse = urlparse(app.config["OPENID_PROVIDER_URL"])
47 return oidname_parse.netloc.replace(".{0}".format(config_parse.netloc), "")
48
62
67
80
81
82 -def page_not_found(message):
83 return flask.render_template("404.html", message=message), 404
84
88
91 """
92 :type message: str
93 :type err: CoprHttpException
94 """
95 return flask.render_template("_error.html",
96 message=message,
97 error_code=code,
98 error_title=title), code
99
100
101 server_error_handler = partial(generic_error, code=500, title="Internal Server Error")
102 bad_request_handler = partial(generic_error, code=400, title="Bad Request")
103
104 misc = flask.Blueprint("misc", __name__)
105
106
107 @misc.route(app.config['KRB5_LOGIN_BASEURI'] + "<name>/", methods=["GET"])
109 """
110 Handle the Kerberos authentication.
111
112 Note that if we are able to get here, either the user is authenticated
113 correctly, or apache is mis-configured and it does not perform KRB
114 authentication at all. Note also, even if that can be considered ugly, we
115 are reusing oid's get_next_url feature with kerberos login.
116 """
117
118
119 if flask.g.user is not None:
120 return flask.redirect(oid.get_next_url())
121
122 krb_config = app.config['KRB5_LOGIN']
123
124 found = None
125 for key in krb_config.keys():
126 if krb_config[key]['URI'] == name:
127 found = key
128 break
129
130 if not found:
131
132 return flask.render_template("404.html"), 404
133
134 if app.config["DEBUG"] and 'TEST_REMOTE_USER' in os.environ:
135
136 flask.request.environ['REMOTE_USER'] = os.environ['TEST_REMOTE_USER']
137
138 if 'REMOTE_USER' not in flask.request.environ:
139 nocred = "Kerberos authentication failed (no credentials provided)"
140 return flask.render_template("403.html", message=nocred), 403
141
142 krb_username = flask.request.environ['REMOTE_USER']
143 app.logger.debug("krb5 login attempt: " + krb_username)
144 username = krb_straighten_username(krb_username)
145 if not username:
146 message = "invalid krb5 username: " + krb_username
147 return flask.render_template("403.html", message=message), 403
148
149 krb_login = (
150 models.Krb5Login.query
151 .filter(models.Krb5Login.config_name == key)
152 .filter(models.Krb5Login.primary == username)
153 .first()
154 )
155 if krb_login:
156 flask.g.user = krb_login.user
157 flask.session['krb5_login'] = krb_login.user.name
158 flask.flash(u"Welcome, {0}".format(flask.g.user.name), "success")
159 return flask.redirect(oid.get_next_url())
160
161
162 user = models.User.query.filter(models.User.username == username).first()
163 if not user:
164
165 email = username + "@" + krb_config[key]['email_domain']
166 user = create_user_wrapper(username, email)
167 db.session.add(user)
168
169 krb_login = models.Krb5Login(user=user, primary=username, config_name=key)
170 db.session.add(krb_login)
171 db.session.commit()
172
173 flask.flash(u"Welcome, {0}".format(user.name), "success")
174 flask.g.user = user
175 flask.session['krb5_login'] = user.name
176 return flask.redirect(oid.get_next_url())
177
178
179 @misc.route("/login/", methods=["GET"])
180 @oid.loginhandler
181 -def login():
182 if not app.config['FAS_LOGIN']:
183 if app.config['KRB5_LOGIN']:
184 return krb5_login_redirect(next=oid.get_next_url())
185 flask.flash("No auth method available", "error")
186 return flask.redirect(flask.url_for("coprs_ns.coprs_show"))
187
188 if flask.g.user is not None:
189 return flask.redirect(oid.get_next_url())
190 else:
191
192 team_req = TeamsRequest(["_FAS_ALL_GROUPS_"])
193 return oid.try_login(app.config["OPENID_PROVIDER_URL"],
194 ask_for=["email", "timezone"],
195 extensions=[team_req])
196
234
235
236 @misc.route("/logout/")
237 -def logout():
238 flask.session.pop("openid", None)
239 flask.session.pop("krb5_login", None)
240 flask.flash(u"You were signed out")
241 return flask.redirect(oid.get_next_url())
242
245 @functools.wraps(f)
246 def decorated_function(*args, **kwargs):
247 token = None
248 apt_login = None
249 if "Authorization" in flask.request.headers:
250 base64string = flask.request.headers["Authorization"]
251 base64string = base64string.split()[1].strip()
252 userstring = base64.b64decode(base64string)
253 (apt_login, token) = userstring.decode("utf-8").split(":")
254 token_auth = False
255 if token and apt_login:
256 user = UsersLogic.get_by_api_login(apt_login).first()
257 if (user and user.api_token == token and
258 user.api_token_expiration >= datetime.date.today()):
259
260 if user.proxy and "username" in flask.request.form:
261 user = UsersLogic.get(flask.request.form["username"]).first()
262
263 token_auth = True
264 flask.g.user = user
265 if not token_auth:
266 url = 'https://' + app.config["PUBLIC_COPR_HOSTNAME"]
267 url = helpers.fix_protocol_for_frontend(url)
268
269 output = {
270 "output": "notok",
271 "error": "Login invalid/expired. Please visit {0}/api to get or renew your API token.".format(url),
272 }
273 jsonout = flask.jsonify(output)
274 jsonout.status_code = 401
275 return jsonout
276 return f(*args, **kwargs)
277 return decorated_function
278
281 krbc = app.config['KRB5_LOGIN']
282 for key in krbc:
283
284 return flask.redirect(flask.url_for("misc.krb5_login",
285 name=krbc[key]['URI'],
286 next=next))
287 flask.flash("Unable to pick krb5 login page", "error")
288 return flask.redirect(flask.url_for("coprs_ns.coprs_show"))
289
292 def view_wrapper(f):
293 @functools.wraps(f)
294 def decorated_function(*args, **kwargs):
295 if flask.g.user is None:
296 return flask.redirect(flask.url_for("misc.login",
297 next=flask.request.url))
298
299 if role == RoleEnum("admin") and not flask.g.user.admin:
300 flask.flash("You are not allowed to access admin section.")
301 return flask.redirect(flask.url_for("coprs_ns.coprs_show"))
302
303 return f(*args, **kwargs)
304 return decorated_function
305
306
307
308
309
310 if callable(role):
311 return view_wrapper(role)
312 else:
313 return view_wrapper
314
318 @functools.wraps(f)
319 def decorated_function(*args, **kwargs):
320 auth = flask.request.authorization
321 if not auth or auth.password != app.config["BACKEND_PASSWORD"]:
322 return "You have to provide the correct password\n", 401
323
324 return f(*args, **kwargs)
325 return decorated_function
326
329 @functools.wraps(f)
330 def decorated_function(*args, **kwargs):
331 ip_addr = IPAddress(flask.request.remote_addr)
332 accept_ranges = set(app.config.get("INTRANET_IPS", []))
333 accept_ranges.add("127.0.0.1")
334 if not any(ip_addr in IPNetwork(addr_or_net) for addr_or_net in accept_ranges):
335 return ("Stats can be update only from intranet hosts, "
336 "not {}, check config\n".format(flask.request.remote_addr)), 403
337
338 return f(*args, **kwargs)
339 return decorated_function
340
353 return wrapper
354
366 return wrapper
367
370 if not build:
371 return send_file("static/status_images/unknown.png",
372 mimetype='image/png')
373
374 if build.state in ["importing", "pending", "starting", "running"]:
375
376
377 response = send_file("static/status_images/in_progress.png",
378 mimetype='image/png')
379 response.headers['Cache-Control'] = 'no-cache'
380 return response
381
382 if build.state in ["succeeded", "skipped"]:
383 return send_file("static/status_images/succeeded.png",
384 mimetype='image/png')
385
386 if build.state == "failed":
387 return send_file("static/status_images/failed.png",
388 mimetype='image/png')
389
390 return send_file("static/status_images/unknown.png",
391 mimetype='image/png')
392