Vitaly Shulgin
12/19/2020, 1:41 PMHTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Job.batch \"\" is invalid: [metadata.name: Required value: name or generateName is required, spec.template.spec.containers[0].name: Required value, spec.template.spec.restartPolicy: Required value: valid values: \"OnFailure\", \"Never\"]","reason":"Invalid","details":{"group":"batch","kind":"Job","causes":[{"reason":"FieldValueRequired","message":"Required value: name or generateName is required","field":"metadata.name"},{"reason":"FieldValueRequired","message":"Required value","field":"spec.template.spec.containers[0].name"},{"reason":"FieldValueRequired","message":"Required value: valid values: \"OnFailure\", \"Never\"","field":"spec.template.spec.restartPolicy"}]},"code":422}
simone
12/19/2020, 2:18 PMimport prefect
from prefect import task, Flow, Parameter, flatten, unmapped
from prefect.executors import DaskExecutor
from prefect.run_configs import LocalRun
@task
def test_parallel(a):
time.sleep(20)
with Flow("filtering-counting",run_config=LocalRun(), executor = DaskExecutor(address='<tcp://193.10.16.58:3111>')) as flow:
all_data = Parameter('all_data',default = list(range(10)))
mapped = test_parallel.map(all_data)
flow.register(project_name="test")
I have updated to the new version of prefect and I get prefect.CloudFlowRunner | Flow run RUNNING: terminal tasks are incomplete.
In the setup with the previous version of prefect I was not getting the error.
I still get the error even if flow.set_reference_tasks([mapped])
I have being going through the docs but I still cannot understand where the error is coming from. Thanks!Pedro Machado
12/21/2020, 5:37 AMShellTask
. I am running a flow using the docker agent and I am passing some env vars to the agent with the -e var=value
option. I suppose these variables available to the shell task. Am I correct? or do I need to pass read them from the environment and pass them explicitly to the task?Sagun Garg
12/21/2020, 9:24 AMJonas Hanfland
12/21/2020, 10:59 AMrunning
, causing it to run for days, but locally it seems to run fine. The logs are empty. It's a BigQueryTask
which writes the result to a table.
What would be the best way to figure out what's going wrong?Mary Clair Thompson
12/21/2020, 1:28 PMKevin Weiler
12/21/2020, 5:45 PMliren zhang
12/21/2020, 6:38 PMdef get_task_run(flow_run_id):
task_run_query = """
query {
task_run ( where:{flow_run_id:{_eq: "%s"}}){
id,
name,
start_time,
end_time,
state,
state_message,
created,
flow_run_id,
heartbeat,
logs {message
}
state_result,
state_start_time,
state_timestamp,
task_id,
tenant_id,
updated,
version
}
}
""" % flow_run_id
# logs {
# id,
# level,
# message
# },
task_run_results = json.loads(Client().graphql(task_run_query).to_json())
for task_run in task_run_results['data']['task_run']:
task_json = json.dumps(task_run).replace('\\', '\\\\').replace("'", "\\'")
print('liren again.............')
save_to_file(task_json,'task_run.log')
mithalee mohapatra
12/22/2020, 7:31 AMJoël Luijmes
12/22/2020, 8:25 AMSagun Garg
12/22/2020, 11:01 AMAjith Kumara Beragala Acharige Lal
12/22/2020, 4:04 PMKrzysztof Nawara
12/22/2020, 6:09 PMSean Talia
12/22/2020, 6:49 PMPedro Martins
12/22/2020, 7:24 PMaircraftlib
comes already installed but when prefect jobs start running it can't find the package.
[2020-12-22 18:10:45+0000] INFO - prefect.S3 | Downloading aircraft-etl/2020-12-22t18-10-34-141798-00-00 from dr-prefect
No module named 'aircraftlib'
Traceback (most recent call last):
File "/usr/local/bin/prefect", line 8, in <module>
sys.exit(cli())
File "/usr/local/lib/python3.6/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/usr/local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/prefect/cli/execute.py", line 90, in flow_run
raise exc
File "/usr/local/lib/python3.6/site-packages/prefect/cli/execute.py", line 67, in flow_run
flow = storage.get_flow(storage.flows[flow_data.name])
File "/usr/local/lib/python3.6/site-packages/prefect/storage/s3.py", line 115, in get_flow
return cloudpickle.loads(output)
File "/usr/local/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 562, in subimport
__import__(name)
ModuleNotFoundError: No module named 'aircraftlib'
Any clue how can I make it work?itay livni
12/22/2020, 10:49 PMapply_map
the graph is fine. The flow runs but is incorrect in the sense that the case
statement is not mapped over. With apply_map
the execution graph is messy with lots of loops and just plain wrong.Pedro Machado
12/23/2020, 4:49 AMDbtShellTask
is running. I set the agent default logging to DEBUG using the cli switch and I set Prefect Server's logging level via the env var PREFECT__LOGGING__LEVEL=DEBUG
I also tried passing these arguments to DbtShellTask
return_all=True,
log_stdout=True,
log_stderr=True,
Any ideas?Amanda Wee
12/23/2020, 11:41 AMNabeel
12/23/2020, 1:00 PMGanesh Kalyansundaram
12/23/2020, 1:05 PMRobert Sokolowski
12/23/2020, 2:23 PMKyle Flanagan
12/23/2020, 7:52 PMimport os
os.environ['PREFECT__FLOWS__CHECKPOINtING'] = 'true'
from prefect import task, Flow
from prefect.engine.results import LocalResult
@task(result=LocalResult(dir='/tmp/prefect-res'))
def my_task():
print("in task")
raise RuntimeError("oh no!")
with Flow('MyFlow') as flow:
my_task()
state = flow.run()
print("Location is: >", state.result[flow.get_tasks()[0]]._result.location, "<")
Result is:
Location is: > None <
If I return
some value from my_task()
the result gets stored, but exceptions do not.Brett Naul
12/23/2020, 8:52 PMversion_group_id
seems like the most appropriate way to do that...but I don't want to rely on it if it might go awayjack
12/23/2020, 9:39 PMFailed to load Flow from C:\Users\JackChoi\.prefect\flows\hello-flow-jack-v2.prefect
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/environments/storage/local.py", line 105, in get_flow
return extract_flow_from_module(module_str=flow_location)
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/storage.py", line 119, in extract_flow_from_module
module = importlib.import_module(module_name)
File "/usr/local/lib/python3.8/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 973, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'C'
0.14.0 = {'_schema': 'Invalid data type: None'}
Does anyone know what's happening here and how I could fix it? Cheers!mithalee mohapatra
12/24/2020, 1:41 AMabhilash.kr
12/24/2020, 8:30 AMale
12/24/2020, 9:07 AMitay livni
12/24/2020, 4:19 PMapply_map
the only way to map over ifelse
and case
statements?wiretrack
12/24/2020, 8:18 PMwiretrack
12/24/2020, 8:18 PMcustomer_flow = Flow('customer_flow')
customer_flow.register(project_name="All Flows")
customer_uptodate = check_status(source='db.sqlite3', target='db_target.sqlite3', table='customers')
test_target_db = test_target_db()
test_source_db = test_source_db()
customer_flow.add_task(test_source_db)
customer_flow.add_task(test_target_db)
customer_flow.add_task(customer_uptodate)
I get the error “ValueError: Could not infer an active Flow context.” (edited)
but I couldn’t understand what exactly i’m doing wrong using the imperative API. registered the flow, registerd the tasks, add_task to flow. i didn’t understand what could I do differently, since the error appears right at my first task (customer_uptodate = check_status(source=‘db.sqlite3’, target=‘db_target.sqlite3’, table=‘customers’)