Sen
02/25/2022, 9:29 AMPending One Run Image▾
Running One image▾
Anna Geller
02/25/2022, 1:00 PMimport requests
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor
@task
def create_url_list():
"""
Given the main page html, creates a list of episode URLs
"""
url_ids = [
"21.10", "21.04", "20.10", "20.04", "19.10", "19.04", "18.10", "18.04",
"17.10", "17.04", "16.10", "16.04", "15.10", "15.04", "14.10", "14.04",
"13.10", "13.04", "12.10", "12.04", "11.10", "11.04", "10.10", "10.04",
"09.10", "09.04", "08.10", "08.04", "07.10", "07.04", "06.10", "06.06",
"05.10", "05.04", "04.10", "06.10", "06.06", "05.10", "05.04", "04.10"
]
urls = []
for url_id in url_ids:
urls.append('<http://old-releases.ubuntu.com/releases/>' + url_id)
return urls
@task
def retrieve_url(url):
print(url)
html = requests.get(url)
if html.ok:
return str(len(html.content))
else:
return None
with Flow(
"On_Prem_MapTest", executor=LocalDaskExecutor()
) as flow:
urls = create_url_list()
url_results = retrieve_url.map(urls)
if __name__ == "__main__":
flow.register("SampleFlows", labels=["On_Prem_MapTest"])
Sen
02/25/2022, 5:28 PMAnna Geller
02/25/2022, 5:38 PMSen
02/25/2022, 5:42 PM# Basic Imports
import os
import requests
# Extracting the Prefect Server URL
os.environ["PREFECT__SERVER__ENDPOINT"] = "<http://MY_SERVER_IP:4200/graphql>"
from prefect import Flow, task
from prefect.environments import LocalEnvironment
from prefect.engine.executors import LocalDaskExecutor
@task
def create_url_list():
"""
Given the main page html, creates a list of episode URLs
"""
url_ids = [
"21.10", "21.04", "20.10", "20.04", "19.10", "19.04", "18.10", "18.04",
"17.10", "17.04", "16.10", "16.04", "15.10", "15.04", "14.10", "14.04",
"13.10", "13.04", "12.10", "12.04", "11.10", "11.04", "10.10", "10.04",
"09.10", "09.04", "08.10", "08.04", "07.10", "07.04", "06.10", "06.06",
"05.10", "05.04", "04.10", "06.10", "06.06", "05.10", "05.04", "04.10"
]
urls = []
for url_id in url_ids:
urls.append('<http://old-releases.ubuntu.com/releases/>' + url_id)
return urls
@task
def retrieve_url(url):
print(url)
html = requests.get(url)
if html.ok:
return str(len(html.content))
else:
return None
def main():
"""Main Function"""
with Flow(
"On_Prem_MapTest",
) as flow:
get_urls = create_url_list()
url_results = retrieve_url.map(get_urls)
flow.environment = LocalEnvironment()
# flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=16)
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=16)
# Registering the Flow
flow.register("SampleFlows", labels=["On_Prem_MapTest"])
if __name__ == "__main__":
main()
Anna Geller
02/25/2022, 5:51 PMSen
02/25/2022, 5:53 PM