Coverage for zombie_nomnom_api/server.py: 100%

34 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-07 04:25 +0000

1from fastapi import FastAPI, HTTPException, Request 

2from importlib.metadata import version 

3 

4from fastapi.responses import JSONResponse 

5 

6from .graphql_app import graphql_app 

7from fastapi.middleware.cors import CORSMiddleware 

8from zombie_nomnom_api import configs 

9from zombie_nomnom_api.rest_app.authentication import ( 

10 get_verifier, 

11 token_auth_scheme, 

12) 

13 

14 

15try: 

16 _version = version("zombie-nomnom-api") 

17except: # pragma: no cover 

18 _version = "dev" 

19 

20fastapi_app = FastAPI( 

21 title="Zombie Nom Nom API", 

22 version=_version, 

23) 

24 

25fastapi_app.add_middleware( 

26 CORSMiddleware, 

27 allow_origins=configs.cors_origins, 

28 allow_credentials=configs.cors_allow_credentials, 

29 allow_methods=configs.cors_methods, 

30 allow_headers=configs.cors_headers, 

31) 

32 

33 

34@fastapi_app.middleware("http") 

35async def hydrate_user( 

36 request: Request, 

37 call_next, 

38): 

39 try: 

40 token = await token_auth_scheme(request) 

41 except HTTPException as e: 

42 if e.detail == "Not authenticated": 

43 return await call_next(request) 

44 return JSONResponse( 

45 {"status": "error", "message": "Not Authenticated"}, status_code=401 

46 ) 

47 

48 result = get_verifier().verify(token.credentials) 

49 

50 if result.get("status"): 

51 return JSONResponse(result, status_code=403) 

52 request.state.user = result 

53 return await call_next(request) 

54 

55 

56@fastapi_app.get("/healthz") 

57def healthz(): 

58 return {"o": "k"} 

59 

60 

61@fastapi_app.get("/version") 

62def version(): 

63 return {"version": _version} 

64 

65 

66@fastapi_app.get("/me") 

67def get_me(request: Request): 

68 return getattr(request.state, "user", None) 

69 

70 

71fastapi_app.mount("/", graphql_app)