Skip to main content

Reading from a subresource

In this section, we'll implement a stream for the survey responses stream. This stream structure is a little different because it depends on the surveys stream.

Start by creating a new base class for substreams:

class SurveyMonkeySubstream(HttpStream, ABC):

def __init__(self, name: str, path: str, primary_key: Union[str, List[str]], parent_stream: Stream, **kwargs: Any) -> None:
self._name = name
self._path = path
self._primary_key = primary_key
self._parent_stream = parent_stream
super().__init__(**kwargs)

url_base = "https://api.surveymonkey.com"

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
links = response.json().get("links", {})
if "next" in links:
return {"next_url": links["next"]}
else:
return {}

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
if next_page_token:
return urlparse(next_page_token["next_url"]).query
else:
return {"per_page": _PAGE_SIZE}

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
yield from response.json().get("data", [])

@property
def name(self) -> str:
return self._name

def path(
self,
*,
stream_state: Optional[Mapping[str, Any]] = None,
stream_slice: Optional[Mapping[str, Any]] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> str:
try:
return self._path.format(stream_slice=stream_slice)
except Exception as e:
raise e

@property
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
return self._primary_key

def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
for _slice in self._parent_stream.stream_slices():
for parent_record in self._parent_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=_slice):
yield parent_record

This class is similar to the base class, but it does not support incremental reads, and its stream slices are generated by reading records from a parent stream. This is how we'll ensure we always read all survey responses.

Note that using this approach, the connector will checkpoint after reading responses for each survey.

Don't forget to update the streams method to also instantiate the surveys responses stream:

    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
auth = TokenAuthenticator(token=config["access_token"])
surveys = SurveyMonkeyBaseStream(name="surveys", path="/v3/surveys", primary_key="id", data_field="data", cursor_field="date_modified", authenticator=auth)
survey_responses = SurveyMonkeySubstream(name="survey_responses", path="/v3/surveys/{stream_slice[id]}/responses/", primary_key="id", authenticator=auth, parent_stream=surveys)
return [
surveys,
survey_responses
]

Before moving on, we'll enable request caching on the surveys stream to avoid fetching the records both for the surveys stream and for the survey responses stream. You can do this by setting the use_cache property to true on the SurveyMonkeyBaseStream class.

    @property
def use_cache(self) -> bool:
return True

Now add the stream to the configured catalog:

{
"streams": [
{
"stream": {
"name": "surveys",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "survey_responses",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}

and create a new schema file in source_survey_monkey_demo/schemas/survey_responses.json. You can use the connector builder to generate the schema, or paste the one provided below:

{
"$schema": "http://json-schema.org/schema#",
"properties": {
"analyze_url": {
"type": ["string", "null"]
},
"collect_stats": {
"properties": {
"status": {
"properties": {
"open": {
"type": ["number", "null"]
}
},
"type": ["object", "null"]
},
"total_count": {
"type": ["number", "null"]
},
"type": {
"properties": {
"weblink": {
"type": ["number", "null"]
}
},
"type": ["object", "null"]
}
},
"type": ["object", "null"]
},
"date_created": {
"type": ["string", "null"]
},
"date_modified": {
"type": ["string", "null"]
},
"href": {
"type": ["string", "null"]
},
"id": {
"type": ["string", "null"]
},
"language": {
"type": ["string", "null"]
},
"nickname": {
"type": ["string", "null"]
},
"preview": {
"type": ["string", "null"]
},
"question_count": {
"type": ["number", "null"]
},
"response_count": {
"type": ["number", "null"]
},
"title": {
"type": ["string", "null"]
}
},
"type": "object"
}

You should now be able to read your survey responses:

poetry run source-survey-monkey-demo read --config secrets/config.json --catalog integration_tests/configured_catalog.json

In the next section we'll update the connector so it reads stream slices concurrently.