Carl
02/25/2021, 10:15 AMupstream_task
will fix this because the load(clean)
task needs to run after the DataFrame()
bit.
clean = impute.map(data_cols, replacement_dict=unmapped({np.nan: 0})) # task
clean = DataFrame(clean, column_names) # function, not a task
load(clean) # task
Adam
02/25/2021, 12:07 PMregister
with an idempotency_key
set, the flow is uploaded to storage despite it not having changed. Is this intended? Command used in the commentsNikhil Akki
02/25/2021, 1:46 PMAndor Tóth
02/25/2021, 2:41 PMAndor Tóth
02/25/2021, 2:43 PMdate
becomes a string
Like this
@task(task_run_name='{name}-{date:%FT%T}')
def say_hello(name):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello, %s!", name)
<http://logger.info|logger.info>('Flow run name: %s', prefect.context.flow_run_name)
jorwoods
02/25/2021, 3:37 PMPedro Machado
02/25/2021, 4:56 PMAndor Tóth
02/25/2021, 5:12 PMAndor Tóth
02/25/2021, 5:30 PMAndor Tóth
02/25/2021, 5:33 PMPeyton Runyan
02/25/2021, 5:47 PMValueError: Could not infer an active Flow context.
Details in thread
with Flow("Batch update airtable flow") as f:
proj, stry = get_selected_bases()
cmdf = get_communities_2017()
scrubbed_proj, scrubbed_stry = scrub_dataframes(proj, stry, cmdf)
debug_task(scrubbed_proj, scrubbed_stry)
Kieran
02/25/2021, 6:13 PMBeginning health checks...
System Version check: OK
Traceback (most recent call last):
File "/opt/prefect/healthcheck.py", line 151, in <module>
flows = cloudpickle_deserialization_check(flow_file_paths)
File "/opt/prefect/healthcheck.py", line 44, in cloudpickle_deserialization_check
flows.append(cloudpickle.loads(flow_bytes))
ModuleNotFoundError: No module named 'flows.tasks'
Registering the same flow locally is passing my check.
from prefect.utilities.debug import is_serializable
if is_serializable(flow): ...
My PYTHONPATH is the root of the directory and when SSH'ing into the container I am able to import the "missing" module...
Any pointers?Matheus Calvelli
02/25/2021, 7:22 PMDave
02/25/2021, 7:31 PMPeyton Runyan
02/25/2021, 7:34 PMwith Flow as f:
x, y = func_1():
no_return_func():
res = final_func(x,y)
Using the example above, how do I signal for final_func
to wait on no_return_func
?
Do I just add a third argument to the function and have no_return_func
return True
? Or is there a cleaner way?Alex Welch
02/25/2021, 10:03 PMSean Talia
02/25/2021, 10:26 PMMaria
02/25/2021, 11:59 PMfrom prefect import Flow
from prefect.tasks.shell import ShellTask
task = ShellTask(helper_script="cd ~")
with Flow("My Flow") as f:
contents = task(command='ls')
out = f.run()
[2021-02-26 10:32:28+1100] INFO - prefect.FlowRunner | Beginning Flow run for 'My Flow'
[2021-02-26 10:32:28+1100] INFO - prefect.TaskRunner | Task 'ShellTask': Starting task run...
[2021-02-26 10:32:28+1100] ERROR - prefect.TaskRunner | Unexpected error: FileNotFoundError(2, 'The system cannot find the file specified', None, 2, None)
Traceback (most recent call last):
.....
....
python\python38-32\lib\subprocess.py", line 1307, in _execute_child
hp, ht, pid, tid = _winapi.CreateProcess(executable, args,
FileNotFoundError: [WinError 2] The system cannot find the file specified
I do have bash installed, also specifying path doesn't help task = ShellTask(shell="C:\Windows\System32\bash.exe", helper_script="cd ~")
matta
02/26/2021, 1:24 AMUnexpected error: ClientError('400 Client Error: Bad Request for url: <http://prefect-apollo:4200/graphql/graphql>\n\nThe following error messages were provided by the GraphQL server:\n\n GRAPHQL_VALIDATION_FAILED: Cannot query field "user" on type "Query".\n\nThe GraphQL query was:\n\n query {\n user {\n default_membership {\n tenant {\n slug\n }\n }\n }\n }\n\nThe passed variables were:\n\n null\n',)
Josh
02/26/2021, 2:18 AMcase
with mypy
? When I try to add a case to the flow, mypy throws Module not callable
with Flow("flow") as flow:
case(task_result, True):
other_task()
Alex Welch
02/26/2021, 4:30 AMKamil Okáč
02/26/2021, 12:34 PMLaura Vaida
02/26/2021, 4:36 PMDocker(
env_vars={
# append modules directory to PYTHONPATH
"PYTHONPATH": "C://Users/laura.vaida.000/anaconda3/envs/prefect/Lib/site-packages"
},
stored_as_script=True
)
Brian Mesick
02/26/2021, 5:29 PMContainer 'flow' state: terminated
Exit Code:: 2
Reason: Error
The job shows up submitted, then ~30 seconds later we get 4 of those errors. Other nearly identical flows are working. The pods don’t hang around long enough to get logs (if they even start). Has anyone else run into this?Alex Welch
02/26/2021, 7:30 PMKeyError
. The second is that it is showing me the correct key and yet stating that it was not found. I have the secrets defined in the Prefect UI
Failed to load and execute Flow's environment: KeyError('The secret <secret> was not found. Please ensure that it was set correctly in your tenant: <https://docs.prefect.io/orchestration/concepts/secrets.html>')
I followed the docs to establish the secret and am using the same variable name in both the UI and the code. GH_TOKEN
GH_TOKEN = Secret("GH_TOKEN").get()
STORAGE = GitHub(
repo="<my_repo_name",
path=f"flows/my_flow.py",
access_token_secret=GH_TOKEN
)
S K
02/26/2021, 8:02 PM@task()
def mainprocess(a):
global df_single_record
try:
for indx in df_data_extracted.index + 1:
df_single_record = df_data_extracted.iloc[indx - 1:indx]
converttojson(df_single_record)
postjsontoqapi(df_json_data)
except Exception as e:
logger = get_logger()
logger.error(str(get_pst_time())
+ '==========ERROR IN mainprocess() WHILE CONVERTING TO JSON/POSTING TO Q-API: '
+ str(e))
raise SystemExit(0)
Alex Welch
02/26/2021, 8:06 PMGitHub
storage? Reason for this is two-fold. Since we would be using the same block of code for just about every flow, it would help keep it semi-DRY. Second, it would allow us to pass different configs depending on environment variables (dev
vs prod
)
from prefect_utils import (
RUN_CONFIG,
STORAGE
)
flow.storage=STORAGE
flow.run_config=RUN_CONFIG
It works with the S3
storage. But when trying to use GitHub
I get the below.
Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'prefect_utils'")
The module is in the same folder as the flow (flows/
). Is this an issue because when the flow is ran it is being ran from the parent directory? thus an issue with relative paths?Carlo
02/26/2021, 8:13 PMflow_a = StartFlowRun(flow_name="A", project_name="examples", wait=True)
flow_b = StartFlowRun(flow_name="B", project_name="examples", wait=True)
flow_c = StartFlowRun(flow_name="C", project_name="examples", wait=True)
flow_d = StartFlowRun(flow_name="D", project_name="examples", wait=True)
with Flow("parent-flow", schedule=weekday_schedule) as flow:
b = flow_b(upstream_tasks=[flow_a])
c = flow_c(upstream_tasks=[flow_a])
d = flow_d(upstream_tasks=[b, c])
Jack Sundberg
02/28/2021, 4:12 AMHiodo
03/01/2021, 7:19 AM