Hello, I'm very new here so apologies for the ques...
# prefect-community
s
Hello, I'm very new here so apologies for the question if it sounds newbie-like - I'm currently reviewing some prefect 1.0 code with mind to refactor into prefect 2.0, and one of the functions for prefect that is currently being used is prefect.core.parameter.DateTimeParameter, which doesnt seem to be supported in prefect 2.0. Is that the correct interpretation? If so, what would be prefect 2.0 alternative for this? Thanks in advance
šŸ‘‹ 1
a
It's totally supported even with pydantic validation! In Prefect 2.0, your flow parameters are defined simply as your flow function arguments and you can add type annotations to ensure that your datetime parameter gets validated as such. Here is an example for a string, but this could also be a datetime object:
Copy code
from prefect import flow

@flow
def say_hi(user_name: str):
    print(f"Hi {user_name}!")

if __name__ == "__main__":
    say_hi("Marvin")
s
Hello! Thank you for the prompt reply. Let me give that a try and get back to you
šŸ‘ 1
Hi Anna. I'm also just looking up the scheduling documentation here, which shows the code: from prefect import DeploymentSpec from prefect.orion.schemas.schedules import CronSchedule DeploymentSpec( name="scheduled-deployment", flow_location="/path/to/flow.py", schedule=CronSchedule(cron="0 0 * * *"), ) I'm just following the example, and the import of the DeploymentSpec doesnt seem to be loading. Is this line meant to be from prefect.deployments import DeploymentSpec ? Apologies if I am barking up the wrong tree!
a
@Sang Young Noh, you're spot on, this import should be:
Copy code
from prefect.deployments import DeploymentSpec
where did you get this wrong import from? I'd love to correct those docs
s
Hello! Here: https://orion-docs.prefect.io/concepts/schedules/ On the CronSchedule and the IntervalSchedule parts, there is the wrong imports. I think there are others there as well
šŸ™ 1
šŸ‘ 1
Hello again! I am just working with the scheduler documentation with prefect, and at the moment we have the following configuration: ā€¢ We set a IntervalClock with an interval of timedelta(minutes = 5). The code is something like clock = IntervalClock(interval = timedelta(minutes = 5), start_date) ā€¢ The schedule is then set via the following command: night_schedule = Schedule(clocks=[clock], filters=[morning_filter]), where morning_filter would be the time 8:30 am and 8:54 am each morning, which from my interpretation, would mean that we would set the scheduling so that each morning from 8:30 to 8:54 in the morning we would schedule a flow task. I'm just checking the prefect 2 documentation, and I was wondering what would be the best approach to replicating this kind of scheduling with prefect 2?
I suppose in a more simple way - If I wanted to schedule to run a pipeline between say 8:30 and 8:54 in the morning in 5 minute intervals, what would be your suggestion in terms of the best scheduler to use/
a
sorry, it took me some time to write it. You can do that using rrule schedules. Check this example and LMK if you need help building your custom schedule https://discourse.prefect.io/t/does-prefect-2-0-support-filters-and-adjustments-e-g-to-restrict-the-schedule-to-only-run-during-business-hours/856
s
Hi Anna! Thank you for the link. After experimenting, I think I figured it out. Just for reference: PollingSchedule = rrule( freq=MINUTELY, interval=5, dtstart=pendulum.datetime(2022, 4, 29, 18, 0), byhour=list(range(8, 9)), byminute=list(range(30,52)) # 30 is inclusive, so as we are printing out all values in 5 min intervals, we get 30, 35, 40, 45, 50 minutes )
a
Yup, exactly! Nice work! šŸ™Œ
s
Hi Anna. Good morning - I hope you are having a good day. I had another question - currently in prefect 1, I have a list of flows that is stored in a list with the schedules 'baked in' so to speak, but I realise that with prefect 2, you have to assign a deployment spec with the rrules assigned (or cron or whatever etc). I was just wondering if I have a number of flows with its in deploymentspecs assigned, how I could run them all in a list at once or in other words concurrentl;y?
The way I have it in my mind at the moment is that, for example, if I have flow1.py and flow2.py in a directory, in the flowExecutor.py file, it would have something on the lines of from prefect.deployments import DeploymentSpec FlowDeploymentNames = ["flow1", "flow2"] Flowfiles = ["./flow1.py", "./flow2.py"] for i in range(0, len(Flowfiles) - 1): # Testing to loop over flows that # we have made that does very boring stuff.. DeploymentSpec( name=FlowDeploymentNames[i], flow_location=Flowfiles[i], tags=['tutorial','test'], parameters={'name':'Leo'} )
However, I'm pretty sure this is not the way to go about things
Ok, I've just checked the above code, and it doest seem to deploy both flows. However, the following code seems to be fine: from prefect import flow from prefect.deployments import DeploymentSpec from prefect.task_runners import SequentialTaskRunner from prefect.flow_runners import SubprocessFlowRunner from prefect.task_runners import DaskTaskRunner # DeploymentSpec is not a callable object - i.e. we cant stuff it inside a list # execute at the same time # In reality, the FlowRunner we would probably want it.. FlowDeploymentNames = ["flow1", "flow2"] Flowfiles = ["./flow1.py", "./flow2.py"] DeploymentSpec( name=FlowDeploymentNames[0], flow_location=Flowfiles[0], #tags=['tutorial','test'], parameters={'name':'Leo'}, flow_runner = SubprocessFlowRunner(), ) DeploymentSpec( name=FlowDeploymentNames[1], flow_location=Flowfiles[1], #tags=['tutorial','test'], parameters={'name':'Leo'}, flow_runner = SubprocessFlowRunner(), ) Just wondering if there was a way to optimize this process? THanks
a
Your process to batch-create deployments in one go is fine and valid. Perhaps to make it easier to keep deployment names and flow files in sync, you could bind it into a dictionary format before looping? here is what I mean:
Copy code
deploy_specs = [
    dict(deployment_name="flow1", flow_path="./flow1.py"),
    dict(deployment_name="flow2", flow_path="./flow2.py"),
    dict(deployment_name="flow3", flow_path="./flow3.py"),
]
for ds in deploy_specs:
    config = DeploymentSpec(name=ds["deployment_name"], flow_location=ds["flow_path"])
But my personal favorite would be a more functional approach - here is an example I was using:
Copy code
from prefect.deployments import DeploymentSpec
from prefect.flows import Flow
from prefect.orion.schemas.schedules import SCHEDULE_TYPES

# from prefect.flow_runners import DockerFlowRunner
from typing import Any, Dict, List
from flows.async_flow import async_flow
from flows.crypto_prices_etl import crypto_prices_etl
from flows.repo_trending_check import repo_trending_check


def set_deployment_spec(
    flow: Flow,
    environment: str = "dev",
    schedule: SCHEDULE_TYPES = None,
    parameters: Dict[str, Any] = None,
    tags: List[str] = None,
) -> DeploymentSpec:
    deploy_tags = [environment] if tags is None else [environment, *tags]
    return DeploymentSpec(
        flow=flow,
        name=f"{flow.name}_{environment}",
        schedule=schedule,
        tags=deploy_tags,
        parameters=parameters,
        # flow_runner=DockerFlowRunner()
    )


set_deployment_spec(async_flow)
set_deployment_spec(crypto_prices_etl)
set_deployment_spec(repo_trending_check)
s
Hello Anna. Thank you for the message. I will analyse the code and see what I can do on my side.
šŸ‘ 1
Hello Anna. I'm just trying to implement your proposed solution, and I have a couple of rather basic questions, so I apologize in advance! 1. In this part:
Copy code
return DeploymentSpec(
        flow=flow,
        name=f"{flow.name}_{environment}",
        schedule=schedule,
        tags=deploy_tags,
        parameters=parameters,
        # flow_runner=DockerFlowRunner()
    )
The flow we wish to tag the deployment spec with, with the input variable flow = flow ; for example, if I were to construct a flow like this: @flow(name="leonardo_dicapriflow") def leonardo_dicapriflow(name: str): log_message(name) return would flow be the decorator name, or the function name? 2. The functional file you put, for example, if I were wanting to deploy onto the prefect cloud, would it be the following command? prefect deployment create functionalDeployment.py which should show the async_flow, crypto_prices_etl, and repo_trending_check deployments scheduled on the prefect cloud ui? Thanks!
a
No need to apologize for questions šŸ˜„
#1 would flow be the decorator name, or the function name?
neither nor - this need to be the function object - so in this case:
flow=leonardo_dicapriflow
#2 Exactly!
it looks like your package arenkods cannot be found in PYTHONPATH - it's a general Python problem unrelated to Prefect. You would need to either create a custom package or add arenkods to your PYTHONPATH I see you deleted your message - did you solve it?
s
(Sorry, had to edit the last message! was getting very messy) I see ok. In the second case, I have custom library import called arenkods, which calls some functions for api calls. I've attached some files - ngflowPrefect2Test.py contains the scheduling part you kindly suggested, and testflow contains a @task and @flow I wish to run with the scheduler; so in essence, there is a @task function called NGExtraction, and a @flow function called NG_Daily_Polling(), but I seem to be getting an import error on my arenkods library. I've checked that the arenkods library has the __i_nit__.py file included, but the error I seem to get is:_ ModuleNotFoundError: No module named 'arenkods' with the command: prefect deployment create ## Just wondering if I am missing something here? custom imports with flows and tasks and deployments seem to have an additional step required.
a
so my message from above still applies
s
Hi Anna! Ok I see haha
šŸ‘ 1
a
If you need an example for building a simple Python package https://github.com/anna-geller/packaging-prefect-flows/blob/master/setup.py
s
yes! ok lovely thank you
It seems my conda environment has trouble with having both prefect and the custom module env at the moment - I will get back to you on this
šŸ‘ 1
Hi again! I'm currently trying to understand a few differences with prefect 1 and prefect 2 -> Is it fair to say that the graphql API for viewing prefect flows and ids have been replaced by prefect deployment inspection? Or am I comparing apples and oranges here?
Also, I see that in prefect 1, there was support for github repository flows? I wonder if this feature is still in prefect 2? Or has that been deprecated?
Ah nevermind, I think this code covers it: from prefect import flow, task from typing import List import httpx @task(retries=3) def get_stars(repo: str): url = f"https://api.github.com/repos/{repo}" count = httpx.get(url).json()["stargazers_count"] print(f"{repo} has {count} stars!") @flow(name="Github Stars") def github_stars(repos: List[str]): for repo in repos: get_stars(repo) # run the flow! github_stars(["PrefectHQ/Prefect", "PrefectHQ/miter-design"])
a
ā€¢ there is REST API instead of GraphQL and inspecting flow and task runs will be mainly possible via UI and CLI ā€¢ GitHub storage is on the roadmap for 2.0 ā€¢ the code you referenced is just a flow example that reads GitHub stars, it's not storage related
šŸ¦œ 1
s
Ah ok! Thank you.
šŸ‘ 1
Hello again. I'm currently testing some scheduling - as with the problem you helped before, so I created a deployment schedule with the current specification when I look with the command 'prefect deployment inspect ###': { 'id': 'bcf880c4-3335-4d29-91e1-188d9a9f00d6', 'created': '2022-05-10T101039.941171+00:00', 'updated': '2022-05-10T101039.938996+00:00', 'name': 'test_dev', 'flow_id': '140c4016-bb9e-4601-be9c-88b2b97d0839', 'flow_data': {'encoding': 'blockstorage', 'blob': '{"data": "\\"/Users/sang.yn/.prefect/storage/b055f28e-6432-42f5-99d0-f646d1753459\\"", "block_id": "0673abf2-e734-4cdf-bae3-b2e30cf28956"}'}, 'schedule': {'rrule': 'DTSTART20220504T080000\nRRULEFREQ=MINUTELY;INTERVAL=5;BYHOUR=8;BYMINUTE=30,35,40,45,50', 'timezone': 'UTC'}, 'is_schedule_active': True, 'parameters': {}, 'tags': ['dev'], 'flow_runner': {'type': 'subprocess', 'config': {'env': {}, 'stream_output': False, 'condaenv': None, 'virtualenv': None}} } So form the rrule schedule I have made, I've made it so that it triggers every 5 minutes between 8:30 am and 8:52 am. So from this, I am, at least in my head, expecting a scheduled triggering at 8:30- 8:52. From this assumption, I've had a look at the prefect server ui to see what scheduling has been prepared, and this is what I see. Digging down into the details, I see that there is a scheduled flow run every day, but the time seems to differ, from the rrule defined time I made I was just wondering if you knew anything about this issue? I think I'm missing a step somewhere here. Thank you!
a
if the started after and before is confusing, don't worry, it's confusing to me too. It's a known issue and I asked the UI team about it regarding the schedule itself, you can test whether it's scheduled properly using rrule directly as shown here and to inspect the scheduled runs, you can preview the queued runs in the work queue using e.g.:
Copy code
prefect work-queue preview --hours 12 'acffbcc8-ae65-4c83-a38a-96e2e5e5b441'
it returns: scheduled start time, flow run ID, flow run name, deployment ID
šŸ™Œ 1
s
Ah ok! So it is a known issue. That is glad to know. I'll see what workaround I can do with it
šŸ‘ 1
Hi Anna. I do have another question: I'm currently trying to create some flows to 'store' - I know that in the current procedure if you have a flow in a .py file, lets say Aflow.py, you could do something like: DeploymentSpec( flow_location = './Aflow.py' name=f"test_{environment}", schedule=schedule, tags=deploy_tags, parameters=parameters, #flow_runner=DockerFlowRunner(), flow_runner = SubprocessFlowRunner(), ) The idea I have at the moment in my head is that I wish to build a class that creates the 'flow' for me, and automates the scheduling as well. So what I mean is a class structure which has a set template for the flow with tasks within in: class ConstructFlow: def __init__(self, description, flowname, flowFunction, start, end, backfill = False): self.description = None self.flowname = flowname self.backfill = backfill self.start = start self.end = end @task def Extraction(self): # Code here @flow def GetFlow(self): self.Extraction() .. So that I can, theoretically, just modify the input for the class and then call the GetFlow() for a fully functional flow. For example: NG_flow = ConstructFlow('Lorem Ipsum', 'Lorem Ipsum', ng.gas_price, start_date, end_date) NG_flow.GetFlow() I suppose my question is - Is this even possible? I'm currently hacking away at it but just wondering what your thought was. And alos, in this case, the returned 'flow' will have a different funciton object name right? Which means scheduling these functions based on the unique function object name would be impossible to do - so would that be a barrier for this kind of approach?
a
build a class that creates the 'flow' for me
I wouldn't recommend that since classes are stateful and flow is supposed to be a dynamic concept
s
I see ok. So, having flows individually (so, for example, contained within their own functions and/or with their own .py files) would be what you would recommend?
a
exactly! this is the easiest to build, test, extend, deploy and operationalize
šŸ™Œ 1
s
Ok, I see. Lovely. Thank you
šŸ‘ 1
a
especially with Prefect 2.0, having flows as functions (rather than classes created with a context manager) allows for much greater flexibility - you could e.g. import and call this flow from another flow if you wanted to
s
I will most likely have to cross that bridge of doing subflows eventually!
a
ask me about it then, I ā¤ļø subflows
s
Hahaha no worries. I'll let you know
šŸ‘ 1
A question on the agents - I currently have a deployment that is queued with a schedule of running every morning at 8:30 am. I ran the command 'prefect agent start ###' and I get the following: Starting agent with ephemeral API... _ _ _ _ _ _ _____ _ _ _ _ _ _____ | _ \ _ \ __| __| __/ _| _| /_\ / __| _| \| | _| | _/ / _|| _|| | (_ | | / _ \ (_ | _|| .` | | | |_| |_|_\___|_| |___\___| |_| /_/ \_\___|___|_|\_| |_| Agent started! Looking for work from queue '63b46bd0-addd-4ed7-b0e9-d56119339dc0'... -- A rather stupid question I understand, but just need clarification - this agent will only 'identify' the work at those scheduled times and the rest it will be waiting right?
Actually just tested it now - seems like that is the case!
šŸ‘ 1
a
yup correct, agent picks up work from the work queue and the flow runner creates the infrastructure for the run and runs the flow at the right time
s
Hi Anna. Just on the deployment - when I do the prefect deployment create ###.py, the deployment uploads fine, but the flow/deployments seem to run upon successful deployment creation. Is this meant to be a feature just to test that the depolyment has been successful?
a
not sure I understand - create only sends deployment metadata to the backend, it doesn't run it. You can then use
prefect deployment run
to run it on the agent
s
Doh, I think I had a call to the flow within each flow.py file, thats why it was called. Nevermind, my mistake. Sorry haha. Edit: Yes, just checked. This was the case
šŸ‘ 1
Hi again. Im working with trying to connect some flows with some dummy dask clients hosted on docker - I was wondering is it possible to task graphs (https://docs.dask.org/en/stable/graphviz.html) of any flows that would be executed in prefect? Thanks
a
would you mind posting this in a new thread? this is a completely different issue. Also, are there some steps you've already taken to implement this? if so, could you explain? what problem would having those visualizations solve? please answer in a separate thread to separate the issues into separate threads, thanks a lot
s
Yeah sorry, I will post a more worked on version of this. I think when I posted the query I was a bit tired. haha
a
No worries, also if this is no longer an issue you can skip it