rectalogic
06/28/2022, 6:57 PMLocal
Storage, but then for production we want the same flow to run on our ECS agent using a GitHub
storage pointing to the same repo. With the UniversalRun
config it can run on either agent, but how do we make the storage conditional so it uses the correct storage depending on which agent it's targetted to?Jake
06/28/2022, 7:49 PM[2022-06-24 09:26:07+0000] INFO - prefect.S3 | Flow successfully downloaded. ETag: "032fd4a15a8c777534d795042e1c0ef6", LastModified: 2022-06-23T15:24:59+00:00, VersionId: None
To my understanding, this is the logging from Prefect when the flow is being downloaded from s3 (before running any tasks). We don’t see these flow runs in the UI either. How do we troubleshoot this and prevent this from happening in the future. Shouldn’t zombie killer have ended these flow runs? If they still had a heartbeat shouldn’t they show up in the cloud UI?Jason Thomas
06/28/2022, 7:52 PMApoorva Desai
06/28/2022, 8:59 PMcode snippet moved to thread
The two DbtShellTasks are identical except for the schema. I tried defining the DbtShellTask without schema outside of my flow and tried a few different things inside my flow:
transform_task.run(dbt_kwargs = {"schema": "schema_1"})
but I always get the same error:
ValueError: Could not infer an active Flow context while creating edge to <Task: DbtShellTask>. This often means you called a task outside a `with Flow(...)` block. If you're trying to run this task outside of a Flow context, you need to call `DbtShellTask(...).run(...)`
I can tell I am doing something silly but can't seem to resolve it 😅Monde Phathwa
06/29/2022, 9:36 AMyu zeng
06/29/2022, 9:38 AMMasatoShima
06/29/2022, 9:42 AMfrom prefect.cli import orion
# Start Orion server.
asyncio.run(start_orion_server())
async def start_orion_server() -> None:
await orion.start()
print("Succeed starting Prefect Orion Server.")
return
Surya
06/29/2022, 9:54 AMNaga Sravika Bodapati
06/29/2022, 12:41 PMMatt Delacour
06/29/2022, 2:18 PMIntervalSchedule
object. Would the flows start at different time or all at the same time ?
What I am looking for is to distribute the load on our servers when flows run. And so a new flow should start at a random time and then follow the IntervalSchedule
constraint.Matt Liszewski
06/29/2022, 2:37 PMMadison Schott
06/29/2022, 4:01 PMdata_pipeline_prod
that we want to somehow label that it is deprecated? And then change the one with test in its name to the new data_pipeline_prod
? Or any suggested ways to do this?Josh
06/29/2022, 4:37 PMBrandon T. Kowalski
06/29/2022, 6:11 PMwith Flow("HathiTrust Deposit (MARCXML)") as flow:
deposit = Parameter("deposit", default=False)
This is the existing parameter. I want to add a second so naturally I tried adding another underneath the first like this:
with Flow("HathiTrust Deposit (MARCXML)") as flow:
deposit = Parameter("deposit", default=False)
local_test = Parameter("local_test", default=False)
This results in an error:
UserWarning: Tasks were created but not added to the flow: {<Parameter: local_test>}. This can occur when `Task` classes, including `Parameters`, are instantiated inside a `with flow
How does one add a parameter to an existing flow?Yupei Chen
06/29/2022, 6:11 PMBilly McMonagle
06/29/2022, 6:14 PMharis khan
06/29/2022, 6:31 PMredsquare
06/29/2022, 6:41 PMShaoyi Zhang
06/29/2022, 6:54 PMYury Cheremushkin
06/29/2022, 7:45 PMconfig.toml
and access them via Secrets API or prefect.context.secrets.get()
. But I can't find anything similar in 2.0Jessica Smith
06/29/2022, 8:42 PMWei Mei
06/29/2022, 9:21 PM@task(nout=1)
def determine_run(clock):
if clock == "full_run":
<http://logger.info|logger.info>("full_run clock")
run_type = "full"
<http://logger.info|logger.info>(f"{run_type}")
return run_type
Amol Shirke
06/29/2022, 11:51 PMAustin Anderson
06/30/2022, 12:14 AMfrom prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
@task
def say_hello(name):
print(f"hello {name}")
@flow(task_runner=DaskTaskRunner(address='tcp://[redacted]:8786'))
def greetings():
say_hello('test')
if __name__ == "__main__":
greetings()
Any ideas?Amogh Kulkarni
06/30/2022, 4:11 AMwonsun
06/30/2022, 6:13 AMSander
06/30/2022, 8:10 AMSurya
06/30/2022, 10:00 AMredsquare
06/30/2022, 10:25 AMharis khan
06/30/2022, 10:28 AMharis khan
06/30/2022, 10:28 AMAnna Geller
06/30/2022, 11:27 AM