-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathconnector.py
More file actions
126 lines (106 loc) · 5.7 KB
/
connector.py
File metadata and controls
126 lines (106 loc) · 5.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import dataiku
from dataiku.connector import Connector
import logging
from utils import get_github_client, fetch_issues
import datetime
import json
import re
class GithubSearchPullRequestsConnector(Connector):
@staticmethod
def resolve_github_team_handles(github_team_handles):
if len(github_team_handles) == 1 and re.fullmatch(r'\[.*\]', github_team_handles[0]) is not None:
# Variable containing list of users
return json.loads(github_team_handles[0])
return github_team_handles
@staticmethod
def resolve_and_parse_date_parameter(date_value, field_name, required):
normalized_value = (date_value or "").strip()
if not normalized_value:
if required:
raise ValueError("{} is mandatory and must be in YYYY-MM-DD format or a variable like ${{var_name}}".format(field_name))
return ""
resolved_value = normalized_value
variable_match = re.fullmatch(r"\$\{([^}]+)\}", normalized_value)
if variable_match is not None:
variable_name = variable_match.group(1)
resolved_value = dataiku.get_custom_variables().get(variable_name, "").strip()
if not resolved_value:
if required:
raise ValueError("{} variable '{}' is empty or undefined".format(field_name, variable_name))
return ""
if re.fullmatch(r"\d{4}-\d{2}-\d{2}", resolved_value) is None:
raise ValueError("{} must resolve to a YYYY-MM-DD date, got '{}'".format(field_name, resolved_value))
return resolved_value
@staticmethod
def build_search_query(link_to_users, user_handle, owner, state, since_date, closed_before_date):
query_parts = [
"{}:{}".format(link_to_users, user_handle),
"user:{}".format(owner),
"is:pr",
"created:>{}".format(since_date)
]
if state in ["open", "closed"]:
query_parts.append("state:{}".format(state))
if closed_before_date and state == "closed":
query_parts.append("closed:<{}".format(closed_before_date))
return " ".join(query_parts)
def __init__(self, config, plugin_config):
super().__init__(config, plugin_config) # pass the parameters to the base class
self.github_client = get_github_client(config)
self.owner = config["owner"]
self.github_team_handles = self.resolve_github_team_handles(config["github_team_handles"])
self.link_to_users = config["link_to_users"]
self.state = config["state"]
self.since_date = self.resolve_and_parse_date_parameter(config.get("since_date"), "since_date", required=True)
self.closed_before_date = self.resolve_and_parse_date_parameter(
config.get("closed_before_date"), "closed_before_date", required=False
)
self.fetch_additional_costly_fields = config["fetch_additional_costly_fields"]
self.enable_auto_retry = config["enable_auto_retry"]
self.number_of_fetch_retry = config["number_of_fetch_retry"]
self.fetched_issues_unique_ids = []
def get_read_schema(self):
# Let DSS infer the schema from the columns returned by the generate_rows method
return None
def generate_rows(self, dataset_schema=None, dataset_partitioning=None, partition_id=None, records_limit=-1):
remaining_records_to_fetch = records_limit
query_date = datetime.datetime.now()
fetched_issues = []
if self.link_to_users in ["all", "open_by"]:
fetched_issues = \
self.fetch_issues_for_users("author", records_limit, remaining_records_to_fetch, query_date)
can_add_new_records = records_limit == -1 or len(self.fetched_issues_unique_ids) < records_limit
if can_add_new_records and self.link_to_users in ["all", "reviewed_by"]:
if records_limit != -1:
remaining_records_to_fetch -= len(self.fetched_issues_unique_ids)
fetched_issues += \
self.fetch_issues_for_users("reviewed-by", records_limit, remaining_records_to_fetch, query_date)
for issue in fetched_issues:
yield issue
def fetch_issues_for_users(self, link, records_limit, remaining_records_to_fetch, query_date):
result = []
for user_handle in self.github_team_handles:
new_issues = self.fetch_issues_for_link_to_users(
query_date, link, user_handle, remaining_records_to_fetch, records_limit
)
result += new_issues
if records_limit != -1:
remaining_records_to_fetch -= len(new_issues)
if remaining_records_to_fetch <= 0:
logging.info("Max number of record reached ({}). Stop fetching.".format(records_limit))
break
return result
def fetch_issues_for_link_to_users(self, query_date, link_to_users, user_handle, remaining_records_to_fetch, records_limit):
search_query = self.build_search_query(
link_to_users, user_handle, self.owner, self.state, self.since_date, self.closed_before_date
)
logging.info(
"Fetching Issues corresponding to search query '{}' (remaining records to fetch: {}, already fetched items: {})".format(
search_query, remaining_records_to_fetch, len(self.fetched_issues_unique_ids)
)
)
issues = fetch_issues(query_date, self.github_client, search_query, records_limit,
self.enable_auto_retry, self.number_of_fetch_retry,
self.fetch_additional_costly_fields, link_to_users, user_handle,
self.fetched_issues_unique_ids)
return issues