Skip to content

ApiRequest

Class that handles the API requests for the interface

Source code in make_us_rich/interface/api_request.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class ApiRequest:
    """
    Class that handles the API requests for the interface
    """
    def __init__(self):
        try:
            self._config = load_env("api")
        except:
            self._config = {"URL": getenv("URL")}
        self.url = self._config["URL"]


    def prediction(self, currency:str, compare:str, token: str) -> Dict:
        """
        Predict endpoint.

        Parameters
        ----------
        currency: str
            Currency used in the model.
        compare: str
            Compare used in the model.
        token: str
            API token of the user.

        Returns
        -------
        Dict
            Dictionary containing the data and the prediction.
        """
        return requests.put(
            f"{self.url}/predict", 
            params={"currency": currency, "compare": compare, "token": token}
        ).json()


    def number_of_available_models(self) -> Dict:
        """
        Number of available models endpoint.

        Returns
        -------
        Dict
            Dictionary containing the number of available models.
        """
        return requests.get(f"{self.url}/check_models_number").json()

number_of_available_models()

Number of available models endpoint.

Returns:

Type Description
Dict

Dictionary containing the number of available models.

Source code in make_us_rich/interface/api_request.py
45
46
47
48
49
50
51
52
53
54
def number_of_available_models(self) -> Dict:
    """
    Number of available models endpoint.

    Returns
    -------
    Dict
        Dictionary containing the number of available models.
    """
    return requests.get(f"{self.url}/check_models_number").json()

prediction(currency, compare, token)

Predict endpoint.

Parameters:

Name Type Description Default
currency str

Currency used in the model.

required
compare str

Compare used in the model.

required
token str

API token of the user.

required

Returns:

Type Description
Dict

Dictionary containing the data and the prediction.

Source code in make_us_rich/interface/api_request.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def prediction(self, currency:str, compare:str, token: str) -> Dict:
    """
    Predict endpoint.

    Parameters
    ----------
    currency: str
        Currency used in the model.
    compare: str
        Compare used in the model.
    token: str
        API token of the user.

    Returns
    -------
    Dict
        Dictionary containing the data and the prediction.
    """
    return requests.put(
        f"{self.url}/predict", 
        params={"currency": currency, "compare": compare, "token": token}
    ).json()

Authentication

Source code in make_us_rich/interface/authentication.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class Authentication:
    def __init__(self, key: str = "mur-key", cookie_ttl: int = 10) -> Tuple[str, str, bool]:
        """
        Initialize the Authentication class.

        Parameters
        ----------
        cookie_ttl: int
            The time to live of the cookie. Default is 10.

        Returns
        -------
        Tuple[str, bool]:
            The token and the status of the authentication.
        """
        self.key = key
        self.cookie_ttl = cookie_ttl
        self.cookie_name = "make_us_rich_auth"


    def _generate_cookie_token(self) -> str:
        """
        Generate a random string to be used as the token for authentication cookie.

        Returns
        -------
        str:
            The JWT token.
        """
        return jwt.encode(
            payload={
                "username": st.session_state["username"], "expiration_date": self._generate_expiration_date()
            }, 
            key=self.key,
        )


    def _generate_expiration_date(self) -> str:
        """
        Generate the expiration date of the token.

        Returns
        -------
        str:
            The expiration date of the token.
        """
        return (datetime.utcnow() + timedelta(days=self.cookie_ttl)).timestamp()


    def _decode_token(self) -> str:
        """
        Decode the token.

        Returns
        -------
        str:
            The decoded token.
        """
        return jwt.decode(self.token, key=self.key, algorithms=["HS256"])


    def login(self, form_title: str) -> Tuple[str, str, bool]:
        """"""
        self.form_title = form_title
        cookie_manager = stx.CookieManager()

        if "authentication_status" not in st.session_state:
            st.session_state["authentication_status"] = None
        if "username" not in st.session_state:
            st.session_state["username"] = None
        if "role" not in st.session_state:
            st.session_state["role"] = None
        if "api_token" not in st.session_state:
            st.session_state["api_token"] = None

        if st.session_state["authentication_status"] != True:
            try:
                self.token = cookie_manager.get(self.cookie_name)
                self.token = self._decode_token()
                if self.token["expiration_date"] > datetime.utcnow().timestamp():
                    st.session_state["authentication_status"] = True
                    st.session_state["username"] = self.token["username"]
                else:
                    st.session_state["authentication_status"] = False
            except:
                st.session_state["authentication_status"] = None

        if st.session_state["authentication_status"] != True:
            login_form = st.form("Login")
            login_form.subheader(
                "Welcome 👋, please login first.\n"
                "If you don't have an account, it will be created automatically when you submit the form."
            )
            input_username_value = st.session_state["username"] if st.session_state["username"] else ""
            self.username = login_form.text_input("Username", value=input_username_value)
            self.password = login_form.text_input("Password", type="password")

            if login_form.form_submit_button("Submit"):
                user_exist = DatabaseHandler.check_if_user_exist(self.username)
                if user_exist["success"]:
                    results = DatabaseHandler.authentication(self.username, self.password)
                    login_message = f"{results['message']} Welcome back {self.username}! 🎉"
                else:
                    results = DatabaseHandler.create_user(self.username, self.password)
                    login_message = f"{results['message']} Welcome {self.username}! 🎉"
                if results["success"]:
                    st.session_state["authentication_status"] = True
                    st.session_state["username"] = results["username"]
                    self.token = self._generate_cookie_token()
                    cookie_manager.set(
                        self.cookie_name, self.token, 
                        expires_at=datetime.now() + timedelta(self.cookie_ttl)
                    )
                    st.success(login_message)
                    time.sleep(5)
                else:
                    st.error(results["message"])

        if st.session_state["authentication_status"] == True:
            st.session_state["role"] = DatabaseHandler._check_user_role(st.session_state["username"])["role"]
            st.session_state["api_token"] = DatabaseHandler.get_api_token(st.session_state["username"])["token"]
            st.sidebar.title("User Panel")
            st.sidebar.markdown(f"**{st.session_state['username']}**, log out by clicking the button below.", unsafe_allow_html=True)
            if st.sidebar.button("Logout", key="logout"):
                cookie_manager.delete(self.cookie_name)
                st.session_state["authentication_status"] = None
                st.session_state["username"] = None
                st.session_state["role"] = None

        return (
            st.session_state["username"], 
            st.session_state["role"], 
            st.session_state["api_token"],
            st.session_state["authentication_status"],
        )

__init__(key='mur-key', cookie_ttl=10)

Initialize the Authentication class.

Parameters:

Name Type Description Default
cookie_ttl int

The time to live of the cookie. Default is 10.

10

Returns:

Type Description
Tuple[str, bool]

The token and the status of the authentication.

Source code in make_us_rich/interface/authentication.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def __init__(self, key: str = "mur-key", cookie_ttl: int = 10) -> Tuple[str, str, bool]:
    """
    Initialize the Authentication class.

    Parameters
    ----------
    cookie_ttl: int
        The time to live of the cookie. Default is 10.

    Returns
    -------
    Tuple[str, bool]:
        The token and the status of the authentication.
    """
    self.key = key
    self.cookie_ttl = cookie_ttl
    self.cookie_name = "make_us_rich_auth"

_decode_token()

Decode the token.

Returns:

Name Type Description
str str

The decoded token.

Source code in make_us_rich/interface/authentication.py
62
63
64
65
66
67
68
69
70
71
def _decode_token(self) -> str:
    """
    Decode the token.

    Returns
    -------
    str:
        The decoded token.
    """
    return jwt.decode(self.token, key=self.key, algorithms=["HS256"])

Generate a random string to be used as the token for authentication cookie.

Returns:

Name Type Description
str str

The JWT token.

Source code in make_us_rich/interface/authentication.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def _generate_cookie_token(self) -> str:
    """
    Generate a random string to be used as the token for authentication cookie.

    Returns
    -------
    str:
        The JWT token.
    """
    return jwt.encode(
        payload={
            "username": st.session_state["username"], "expiration_date": self._generate_expiration_date()
        }, 
        key=self.key,
    )

_generate_expiration_date()

Generate the expiration date of the token.

Returns:

Name Type Description
str str

The expiration date of the token.

Source code in make_us_rich/interface/authentication.py
50
51
52
53
54
55
56
57
58
59
def _generate_expiration_date(self) -> str:
    """
    Generate the expiration date of the token.

    Returns
    -------
    str:
        The expiration date of the token.
    """
    return (datetime.utcnow() + timedelta(days=self.cookie_ttl)).timestamp()

DatabaseHandler

This class handles the database connection and the queries.

Source code in make_us_rich/interface/database_handler.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
class DatabaseHandler:
    """
    This class handles the database connection and the queries.
    """

    @classmethod
    def authentication(cls, username:str, password:str) -> Dict[str, bool]:
        """
        Checks if the user and password are correct

        Parameters
        ----------
        username: str
            The username of the user.
        password: str
            The password of the user.

        Returns
        -------
        dict:
            A dictionary with the success of the authentication.
        """
        try:
            cls._connect()
            cursor = cls.connection.cursor()
            cursor.execute(f"""
                SELECT id FROM users WHERE username = '{username}' AND password = crypt('{password}', password);
            """)
            match = cursor.fetchone()
            cls._disconnect()
            return {
                "success": True, "message": "Authentication successful.", "username": username
            } if match else {"success": False}
        except Exception as e: 
            return {"error": str(e)}


    @classmethod
    def create_user(cls, username:str, password:str) -> Dict[str, Any]:
        """
        Creates a new user in the database.

        Parameters
        ----------
        username: str
            The username of the new user.
        password: str
            The password of the new user.

        Returns
        -------
        dict:
            A dictionary with the success of the creation.
        """
        try:
            cls._connect()
            cursor = cls.connection.cursor()
            cursor.execute(f"""
                INSERT INTO users (username, password)
                VALUES ('{username}', crypt('{password}', gen_salt('bf'))) 
                ON CONFLICT (username) DO NOTHING;
            """)
            cls.connection.commit()
            cls._disconnect()
            cls._add_member_role_to_user(username)
            cls._generate_token_for_user(username)
            cls._init_api_limit_for_user(username)
            return {
                "success": True, "message": "User created successfully.", "username": username,
            }
        except Exception as e: 
            return {"success": False, "message": str(e)}


    @classmethod
    def check_if_user_exist(cls, username:str) -> Dict[str, Any]:
        """
        Checks if a user exists in the database.

        Parameters
        ----------
        username: str
            The username of the user.

        Returns
        -------
        dict:
            A dictionary with the success of the check.
        """
        try:
            cls._connect()
            cursor = cls.connection.cursor()
            cursor.execute(f"""
                SELECT id FROM users WHERE username = '{username}';
            """)
            match = cursor.fetchone()
            cls._disconnect()
            return {"success": True} if match else {"success": False}
        except Exception as e:
            return {"error": str(e)}


    @classmethod
    def get_api_token(cls, username:str) -> Dict[str, str]:
        """
        Gets the API token of a user.

        Parameters
        ----------
        username: str
            The username of the user.

        Returns
        -------
        dict:
            A dictionary with the API token of the user.
        """
        try:
            cls._connect()
            cursor = cls.connection.cursor()
            cursor.execute(f"""
                SELECT token FROM api_tokens
                JOIN users ON users.id = api_tokens.user_id
                WHERE users.username = '{username}';
            """)
            token = cursor.fetchone()
            cls._disconnect()
            return {"success": True, "token": token[0]} if token else {"success": False}
        except Exception as e:
            return {"error": str(e)}


    @classmethod
    def increment_user_api_consumption(cls, username:str) -> Dict[str, Any]:
        """
        Increments the API consumption of a user.

        Parameters
        ----------
        username: str
            The username of the user.

        Returns
        -------
        dict:
            A dictionary with the success of the increment.
        """
        try:
            cls._connect()
            cursor = cls.connection.cursor()
            cursor.execute(f"""
                UPDATE user_api_consumptions SET api_consumption = api_consumption + 1
                WHERE user_id = (SELECT id FROM users WHERE username = '{username}');
            """)
            cls.connection.commit()
            cls._disconnect()
            return {"success": True}
        except Exception as e:
            return {"error": str(e)}


    @classmethod
    def get_user_api_consumption(cls, username:str) -> Dict[str, Any]:
        """
        Gets the API consumption of a user.

        Parameters
        ----------
        username: str
            The username of the user.

        Returns
        -------
        dict:
            A dictionary with the API consumption of the user.
        """
        try:
            cls._connect()
            cursor = cls.connection.cursor()
            cursor.execute(f"""
                SELECT api_consumption FROM user_api_consumptions
                WHERE user_id = (SELECT id FROM users WHERE username = '{username}');
            """)
            consumption = cursor.fetchone()
            cls._disconnect()
            return {"success": True, "consumption": consumption[0]}
        except Exception as e:
            return {"error": str(e)}


    @classmethod
    def _check_user_role(cls, username:str) -> Dict[str, str]:
        """
        Checks the role of the user.

        Parameters
        ----------
        username: str
            The username of the user.

        Returns
        -------
        dict:
            A dictionary with the role of the user.
        """
        try:
            cls._connect()  
            cursor = cls.connection.cursor()          
            cursor.execute(f"""
                SELECT name FROM user_roles
                JOIN users ON users.id = user_roles.user_id
                JOIN roles ON roles.id = user_roles.role_id
                WHERE users.username = '{username}'; 
            """)
            role = cursor.fetchone()[0]
            cls._disconnect()
            return {"role": role}
        except Exception as e: 
            return {"error": str(e)}


    @classmethod
    def _add_member_role_to_user(cls, username:str) -> Dict[str, Any]:
        """
        Adds the role of the user to the database.

        Parameters
        ----------
        username: str
            The username of the user.

        Returns
        -------
        dict:
            A dictionary with the success of the creation.
        """
        try:
            cls._connect()
            cursor = cls.connection.cursor()
            cursor.execute(f"""
                INSERT INTO user_roles (user_id, role_id)
                SELECT u.id, r.id
                FROM (SELECT id FROM users WHERE username = '{username}') u,
                (SELECT id FROM roles WHERE name = 'member') r;
            """)
            cls.connection.commit()
            cls._disconnect()
            return {"success": True}
        except Exception as e: 
            return {"success": False, "message": str(e)}


    @classmethod
    def _generate_token_for_user(cls, username:str) -> Dict[str, Any]:
        """
        Generates a token for the user.

        Parameters
        ----------
        username: str
            The username of the user.

        Returns
        -------
        str:
            The token of the user.
        """
        token = random_string()
        try:
            cls._connect()
            cursor = cls.connection.cursor()
            cursor.execute(f"""
                INSERT INTO api_tokens (user_id, token)
                SELECT u.id, '{token}'
                FROM (SELECT id FROM users WHERE username = '{username}') u;
            """)
            cls.connection.commit()
            cls._disconnect()
            return {"success": True}
        except Exception as e: 
            return {"success": False, "message": str(e)}


    @classmethod
    def _init_api_limit_for_user(cls, username:str) -> Dict[str, Any]:
        """
        Initializes the API limit for the user.

        Parameters
        ----------
        username: str
            The username of the user.

        Returns
        -------
        dict:
            A dictionary with the success of the creation.
        """
        try:
            cls._connect()
            cursor = cls.connection.cursor()
            cursor.execute(f"""
                INSERT INTO user_api_consumptions (user_id, api_consumption)
                SELECT u.id, 0
                FROM (SELECT id FROM users WHERE username = '{username}') u;
            """)
            cls.connection.commit()
            cls._disconnect()
            return {"success": True}
        except Exception as e: 
            return {"success": False, "message": str(e)}


    @classmethod
    def _connect(cls):
        """
        Connects to the database.
        """
        try:
            cls.connection = psycopg2.connect(
                database=getenv("POSTGRES_DB"),
                user=getenv("POSTGRES_USER"),
                host=getenv("HOST"),
                password=getenv("POSTGRES_PASSWORD")
            )
        except Exception as e:
            return e


    @classmethod
    def _disconnect(cls):
        """
        Closes the connection to the database.
        """
        try:
            cls.connection.close()
        except Exception as e:
            return e

_add_member_role_to_user(username) classmethod

Adds the role of the user to the database.

Parameters:

Name Type Description Default
username str

The username of the user.

required

Returns:

Name Type Description
dict Dict[str, Any]

A dictionary with the success of the creation.

Source code in make_us_rich/interface/database_handler.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
@classmethod
def _add_member_role_to_user(cls, username:str) -> Dict[str, Any]:
    """
    Adds the role of the user to the database.

    Parameters
    ----------
    username: str
        The username of the user.

    Returns
    -------
    dict:
        A dictionary with the success of the creation.
    """
    try:
        cls._connect()
        cursor = cls.connection.cursor()
        cursor.execute(f"""
            INSERT INTO user_roles (user_id, role_id)
            SELECT u.id, r.id
            FROM (SELECT id FROM users WHERE username = '{username}') u,
            (SELECT id FROM roles WHERE name = 'member') r;
        """)
        cls.connection.commit()
        cls._disconnect()
        return {"success": True}
    except Exception as e: 
        return {"success": False, "message": str(e)}

_check_user_role(username) classmethod

Checks the role of the user.

Parameters:

Name Type Description Default
username str

The username of the user.

required

Returns:

Name Type Description
dict Dict[str, str]

A dictionary with the role of the user.

Source code in make_us_rich/interface/database_handler.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
@classmethod
def _check_user_role(cls, username:str) -> Dict[str, str]:
    """
    Checks the role of the user.

    Parameters
    ----------
    username: str
        The username of the user.

    Returns
    -------
    dict:
        A dictionary with the role of the user.
    """
    try:
        cls._connect()  
        cursor = cls.connection.cursor()          
        cursor.execute(f"""
            SELECT name FROM user_roles
            JOIN users ON users.id = user_roles.user_id
            JOIN roles ON roles.id = user_roles.role_id
            WHERE users.username = '{username}'; 
        """)
        role = cursor.fetchone()[0]
        cls._disconnect()
        return {"role": role}
    except Exception as e: 
        return {"error": str(e)}

_connect() classmethod

Connects to the database.

Source code in make_us_rich/interface/database_handler.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
@classmethod
def _connect(cls):
    """
    Connects to the database.
    """
    try:
        cls.connection = psycopg2.connect(
            database=getenv("POSTGRES_DB"),
            user=getenv("POSTGRES_USER"),
            host=getenv("HOST"),
            password=getenv("POSTGRES_PASSWORD")
        )
    except Exception as e:
        return e

_disconnect() classmethod

Closes the connection to the database.

Source code in make_us_rich/interface/database_handler.py
338
339
340
341
342
343
344
345
346
@classmethod
def _disconnect(cls):
    """
    Closes the connection to the database.
    """
    try:
        cls.connection.close()
    except Exception as e:
        return e

_generate_token_for_user(username) classmethod

Generates a token for the user.

Parameters:

Name Type Description Default
username str

The username of the user.

required

Returns:

Name Type Description
str Dict[str, Any]

The token of the user.

Source code in make_us_rich/interface/database_handler.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
@classmethod
def _generate_token_for_user(cls, username:str) -> Dict[str, Any]:
    """
    Generates a token for the user.

    Parameters
    ----------
    username: str
        The username of the user.

    Returns
    -------
    str:
        The token of the user.
    """
    token = random_string()
    try:
        cls._connect()
        cursor = cls.connection.cursor()
        cursor.execute(f"""
            INSERT INTO api_tokens (user_id, token)
            SELECT u.id, '{token}'
            FROM (SELECT id FROM users WHERE username = '{username}') u;
        """)
        cls.connection.commit()
        cls._disconnect()
        return {"success": True}
    except Exception as e: 
        return {"success": False, "message": str(e)}

_init_api_limit_for_user(username) classmethod

Initializes the API limit for the user.

Parameters:

Name Type Description Default
username str

The username of the user.

required

Returns:

Name Type Description
dict Dict[str, Any]

A dictionary with the success of the creation.

Source code in make_us_rich/interface/database_handler.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
@classmethod
def _init_api_limit_for_user(cls, username:str) -> Dict[str, Any]:
    """
    Initializes the API limit for the user.

    Parameters
    ----------
    username: str
        The username of the user.

    Returns
    -------
    dict:
        A dictionary with the success of the creation.
    """
    try:
        cls._connect()
        cursor = cls.connection.cursor()
        cursor.execute(f"""
            INSERT INTO user_api_consumptions (user_id, api_consumption)
            SELECT u.id, 0
            FROM (SELECT id FROM users WHERE username = '{username}') u;
        """)
        cls.connection.commit()
        cls._disconnect()
        return {"success": True}
    except Exception as e: 
        return {"success": False, "message": str(e)}

authentication(username, password) classmethod

Checks if the user and password are correct

Parameters:

Name Type Description Default
username str

The username of the user.

required
password str

The password of the user.

required

Returns:

Name Type Description
dict Dict[str, bool]

A dictionary with the success of the authentication.

Source code in make_us_rich/interface/database_handler.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@classmethod
def authentication(cls, username:str, password:str) -> Dict[str, bool]:
    """
    Checks if the user and password are correct

    Parameters
    ----------
    username: str
        The username of the user.
    password: str
        The password of the user.

    Returns
    -------
    dict:
        A dictionary with the success of the authentication.
    """
    try:
        cls._connect()
        cursor = cls.connection.cursor()
        cursor.execute(f"""
            SELECT id FROM users WHERE username = '{username}' AND password = crypt('{password}', password);
        """)
        match = cursor.fetchone()
        cls._disconnect()
        return {
            "success": True, "message": "Authentication successful.", "username": username
        } if match else {"success": False}
    except Exception as e: 
        return {"error": str(e)}

check_if_user_exist(username) classmethod

Checks if a user exists in the database.

Parameters:

Name Type Description Default
username str

The username of the user.

required

Returns:

Name Type Description
dict Dict[str, Any]

A dictionary with the success of the check.

Source code in make_us_rich/interface/database_handler.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@classmethod
def check_if_user_exist(cls, username:str) -> Dict[str, Any]:
    """
    Checks if a user exists in the database.

    Parameters
    ----------
    username: str
        The username of the user.

    Returns
    -------
    dict:
        A dictionary with the success of the check.
    """
    try:
        cls._connect()
        cursor = cls.connection.cursor()
        cursor.execute(f"""
            SELECT id FROM users WHERE username = '{username}';
        """)
        match = cursor.fetchone()
        cls._disconnect()
        return {"success": True} if match else {"success": False}
    except Exception as e:
        return {"error": str(e)}

create_user(username, password) classmethod

Creates a new user in the database.

Parameters:

Name Type Description Default
username str

The username of the new user.

required
password str

The password of the new user.

required

Returns:

Name Type Description
dict Dict[str, Any]

A dictionary with the success of the creation.

Source code in make_us_rich/interface/database_handler.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@classmethod
def create_user(cls, username:str, password:str) -> Dict[str, Any]:
    """
    Creates a new user in the database.

    Parameters
    ----------
    username: str
        The username of the new user.
    password: str
        The password of the new user.

    Returns
    -------
    dict:
        A dictionary with the success of the creation.
    """
    try:
        cls._connect()
        cursor = cls.connection.cursor()
        cursor.execute(f"""
            INSERT INTO users (username, password)
            VALUES ('{username}', crypt('{password}', gen_salt('bf'))) 
            ON CONFLICT (username) DO NOTHING;
        """)
        cls.connection.commit()
        cls._disconnect()
        cls._add_member_role_to_user(username)
        cls._generate_token_for_user(username)
        cls._init_api_limit_for_user(username)
        return {
            "success": True, "message": "User created successfully.", "username": username,
        }
    except Exception as e: 
        return {"success": False, "message": str(e)}

get_api_token(username) classmethod

Gets the API token of a user.

Parameters:

Name Type Description Default
username str

The username of the user.

required

Returns:

Name Type Description
dict Dict[str, str]

A dictionary with the API token of the user.

Source code in make_us_rich/interface/database_handler.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
@classmethod
def get_api_token(cls, username:str) -> Dict[str, str]:
    """
    Gets the API token of a user.

    Parameters
    ----------
    username: str
        The username of the user.

    Returns
    -------
    dict:
        A dictionary with the API token of the user.
    """
    try:
        cls._connect()
        cursor = cls.connection.cursor()
        cursor.execute(f"""
            SELECT token FROM api_tokens
            JOIN users ON users.id = api_tokens.user_id
            WHERE users.username = '{username}';
        """)
        token = cursor.fetchone()
        cls._disconnect()
        return {"success": True, "token": token[0]} if token else {"success": False}
    except Exception as e:
        return {"error": str(e)}

get_user_api_consumption(username) classmethod

Gets the API consumption of a user.

Parameters:

Name Type Description Default
username str

The username of the user.

required

Returns:

Name Type Description
dict Dict[str, Any]

A dictionary with the API consumption of the user.

Source code in make_us_rich/interface/database_handler.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
@classmethod
def get_user_api_consumption(cls, username:str) -> Dict[str, Any]:
    """
    Gets the API consumption of a user.

    Parameters
    ----------
    username: str
        The username of the user.

    Returns
    -------
    dict:
        A dictionary with the API consumption of the user.
    """
    try:
        cls._connect()
        cursor = cls.connection.cursor()
        cursor.execute(f"""
            SELECT api_consumption FROM user_api_consumptions
            WHERE user_id = (SELECT id FROM users WHERE username = '{username}');
        """)
        consumption = cursor.fetchone()
        cls._disconnect()
        return {"success": True, "consumption": consumption[0]}
    except Exception as e:
        return {"error": str(e)}

increment_user_api_consumption(username) classmethod

Increments the API consumption of a user.

Parameters:

Name Type Description Default
username str

The username of the user.

required

Returns:

Name Type Description
dict Dict[str, Any]

A dictionary with the success of the increment.

Source code in make_us_rich/interface/database_handler.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
@classmethod
def increment_user_api_consumption(cls, username:str) -> Dict[str, Any]:
    """
    Increments the API consumption of a user.

    Parameters
    ----------
    username: str
        The username of the user.

    Returns
    -------
    dict:
        A dictionary with the success of the increment.
    """
    try:
        cls._connect()
        cursor = cls.connection.cursor()
        cursor.execute(f"""
            UPDATE user_api_consumptions SET api_consumption = api_consumption + 1
            WHERE user_id = (SELECT id FROM users WHERE username = '{username}');
        """)
        cls.connection.commit()
        cls._disconnect()
        return {"success": True}
    except Exception as e:
        return {"error": str(e)}

Plots

candlestick_plot(data, currency, compare, pred)

Create candlestick plot.

Parameters:

Name Type Description Default
data pd.DataFrame

Dataframe containing the data to plot.

required
currency str

Currency to plot.

required
compare str

Currency to compare.

required
pred float

Prediction to plot.

required

Returns:

Type Description
go.Figure

Plotly figure.

Source code in make_us_rich/interface/plots.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def candlestick_plot(data: pd.DataFrame, currency: str, compare: str, pred: float) -> go.Figure:
    """
    Create candlestick plot.

    Parameters
    ----------
    data : pd.DataFrame
        Dataframe containing the data to plot.
    currency : str
        Currency to plot.
    compare : str
        Currency to compare.
    pred : float
        Prediction to plot.

    Returns
    -------
    go.Figure
        Plotly figure.
    """
    fig = go.Figure()
    fig.add_trace(
        go.Candlestick(
            x=data["timestamp"], open=data["open"], high=data["high"], low=data["low"], close=data["close"], 
            name="Candlestick"
        )
    )
    fig.update_layout(
        title=f"{currency.upper()}/{compare.upper()} - last 5 days",
        yaxis_title=f"{currency.upper()} Price",
    )
    pred = float(data.iloc[-1]["close"]) + pred
    timestamp = data["timestamp"].max() + timedelta(hours=1)
    fig.add_trace(
        go.Scatter(
            x=[timestamp], y=[pred], mode="markers", marker_color="red", name="Prediction"
        )
    )
    fig.update_layout(
        title=f"{currency.upper()}/{compare.upper()} - last 5 days",
        yaxis_title=f"{currency.upper()} Price",
        annotations=[
            go.Annotation(
                x=timestamp,
                y=pred,
                text=f"Prediction: {pred:.3f}",
                showarrow=False,
            )
        ],
    )
    return fig

format_data(data)

Format data from API response before plotting.

Parameters:

Name Type Description Default
data Dict[str, Any]

Data from API response.

required

Returns:

Type Description
Tuple[pd.DataFrame, float]

Dataframe containing the data to plot and the prediction.

Source code in make_us_rich/interface/plots.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def format_data(data: Dict[str, Any]) -> Tuple[pd.DataFrame, float]:
    """
    Format data from API response before plotting.

    Parameters
    ----------
    data : Dict[str, Any]
        Data from API response.

    Returns
    -------
    Tuple[pd.DataFrame, float]
        Dataframe containing the data to plot and the prediction.
    """
    pred = data["prediction"]
    df = pd.DataFrame(
        data=data["data"],
        columns=[
            "timestamp",
            "open",
            "high",
            "low",
            "close",
            "volume",
            "close_time",
            "quote_av",
            "trades",
            "tb_base_av",
            "tb_quote_av",
            "ignore",
        ],
    )
    df["timestamp"] = pd.to_datetime(df["timestamp"])
    df["close"] = pd.to_numeric(df["close"], downcast="float")
    df["open"] = pd.to_numeric(df["open"], downcast="float")
    df["high"] = pd.to_numeric(df["high"], downcast="float")
    df["low"] = pd.to_numeric(df["low"], downcast="float")
    return df, pred

scatter_plot(data, currency, compare, pred)

Create scatter plot.

Parameters:

Name Type Description Default
data pd.DataFrame

Dataframe containing the data to plot.

required
currency str

Currency to plot.

required
compare str

Currency to compare.

required
pred float

Prediction to plot.

required

Returns:

Type Description
go.Figure

Plotly figure.

Source code in make_us_rich/interface/plots.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def scatter_plot(data: pd.DataFrame, currency: str, compare: str, pred: float) -> go.Figure:
    """
    Create scatter plot.

    Parameters
    ----------
    data : pd.DataFrame
        Dataframe containing the data to plot.
    currency : str
        Currency to plot.
    compare : str
        Currency to compare.
    pred : float
        Prediction to plot.

    Returns
    -------
    go.Figure
        Plotly figure.
    """
    fig = go.Figure()
    fig.add_trace(
        go.Scatter(
            x=data["timestamp"], y=data["close"], name="Close Price", line_color="blue", connectgaps=True
        )
    )
    fig.update_layout(
        title=f"{currency.upper()}/{compare.upper()} - last 5 days",
        yaxis_title=f"{currency.upper()} Price",
    )
    pred = float(data.iloc[-1]["close"]) + pred
    timestamp = data["timestamp"].max() + timedelta(hours=1)
    fig.add_trace(
        go.Scatter(
            x=[timestamp], y=[pred], mode="markers", marker_color="red", name="Prediction"
        )
    )
    fig.update_layout(
        title=f"{currency.upper()}/{compare.upper()} - last 5 days",
        yaxis_title=f"{currency.upper()} Price",
        annotations=[
            go.Annotation(
                x=timestamp,
                y=pred,
                text=f"Prediction: {pred:.3f}",
                showarrow=False,
            )
        ],
    )
    return fig

Last update: 2022-05-04