Mary Clair Thompson
08/27/2020, 6:35 PMJulien Allard
08/27/2020, 10:12 PMUnexpected error: TimeoutError()
error. Sometimes, the error happens before any task are started. I'm really unsure on how to debug this, so any help is appreciated!Jovan Visnjic
08/28/2020, 9:13 AMPrefectResult
with prefect core server running on my local machine. According to the docs, I understood that it should cache the result in prefect's database. I am running this simple example, extended from the one in the docs, I just added random raising of exceptions:
@task(result=PrefectResult())
def add(x, y=1):
if random.random() > 0.7:
raise Exception('I failed on purpose')
return x + y
with Flow("my handled flow!") as flow:
first_result = add.map(list(range(10)), y=unmapped(2))
When I restart the flow, it doesn't just run the failed mapped tasks, It runs all of them again, also the successful ones. On the other hand it works fine if I use LocalResult
. Am I missing something? Any help would be much appreciated.William Smith
08/28/2020, 10:16 AMMiecio
08/28/2020, 10:34 AMitay livni
08/28/2020, 2:05 PMapply_map
on another Flow's tasks.? Basically I am trying to replace a particular flow with apply_map..Hawkar Mahmod
08/28/2020, 2:17 PMHannah Amundson
08/28/2020, 6:08 PM@task
def double_number(number):
return number * 2
with Flow("name") as flow:
number = 4
double_number(number)
Eric Dobroveanu
08/28/2020, 9:09 PMMinakshi
08/28/2020, 10:47 PMAlex Papanicolaou
08/29/2020, 8:14 PM12:10:11 prefect.CloudTaskRunner Task 'run_simulator[54]': Starting task run...
12:10:11 prefect.run_simulator[54] Starting simulator run
12:10:11 prefect.run_simulator[54] cusip_list [{'secmnem': 'FNMMA3057', 'cusip': '31418CMF8'}]
12:10:11 prefect.run_simulator[54] Loading model 'cf621134-8c36-446a-96b5-7ecde88a33e2'
12:10:22 prefect.run_simulator[54] Simulating pool {'secmnem': 'FNMMA3057', 'cusip': '31418CMF8'}
12:10:31 prefect.run_simulator[54] Number of replicates 6
12:11:59 prefect.CloudTaskRunner Task 'run_simulator[54]': finished task run for task with final state: 'Success'
Here is an example though (and they don’t appear super common) where the task succeeded and then was later rerun.
One thing you can note is that the model id is different. this is randomly generated (not a big deal) but along with the timestamp just confirms that this is repeated run not a duplicated log.
11:55:34 prefect.CloudTaskRunner Task 'run_simulator[6]': Starting task run...
11:55:35 prefect.run_simulator[6] Starting simulator run
11:55:35 prefect.run_simulator[6] cusip_list [{'secmnem': 'FNMMA3774', 'cusip': '31418DFQ0'}]
11:55:35 prefect.run_simulator[6] Loading model 'c410358f-4612-4aef-8f12-e9a3642711de'
11:56:23 prefect.run_simulator[6] Simulating pool {'secmnem': 'FNMMA3774', 'cusip': '31418DFQ0'}
11:56:36 prefect.run_simulator[6] Number of replicates 3
11:57:12 prefect.CloudTaskRunner Task 'run_simulator[6]': finished task run for task with final state: 'Success'
12:06:17 prefect.CloudTaskRunner Task 'run_simulator[6]': Starting task run...
12:06:17 prefect.run_simulator[6] Starting simulator run
12:06:17 prefect.run_simulator[6] cusip_list [{'secmnem': 'FNMMA3774', 'cusip': '31418DFQ0'}]
12:06:17 prefect.run_simulator[6] Loading model '45322fce-d452-4340-9e06-e7bcc2775b84'
12:06:27 prefect.run_simulator[6] Simulating pool {'secmnem': 'FNMMA3774', 'cusip': '31418DFQ0'}
12:06:40 prefect.run_simulator[6] Number of replicates 3
12:07:15 prefect.CloudTaskRunner Task 'run_simulator[6]': finished task run for task with final state: 'Success'
Johnny Bravo
08/29/2020, 8:31 PMflow.run(executor=DaskExecutor(
cluster_class=LocalCluster, cluster_kwargs={"n_workers": 2, "threads_per_worker": 1}))
Not sure if is the right way to do. Now I have this flow
with Flow("Files downloader") as flow:
files = get_files()
downloaded_files = download_file.map(files)
import_file.map(downloaded_files)
The problem here is, after first two downloads, it goes to the next download task, instead of getting to import task. So, because I'm limited to 2 workers at a time, I need to prioritize import_file
task over download_file
task. Is there a better way to do this?Hawkar Mahmod
08/30/2020, 11:22 AMLOOP
signal. If I have a task that calls an API, and has to loop until it has no more data to fetch, can put pass each iteration of the call to a downstream task like so:
results = transform_data.map(call_api())
Inside call_api
I am using a LOOP
signal. But I cannot seem to access the loop results in the next task transform_data
. My understanding was that when using this construct, each iteration of the task was it's own task.Chris Goddard
08/30/2020, 3:45 PMbackend
is correctly set and the runner token is available as an environment variable). however, when I give the flow docker storage and spin up a docker agent, nothing happens when I try to trigger a flow from the ui - no errors, it's just like it's not receiving any instructions from prefect cloud.
I am working in WSL2 (widows linux subsystem) - which creates all kinds of hellish networking issues (classic windows) - but I've confirmed that docker is working and I've run the docker image that was created for my flow and run the flow manually within the container by unpickling and running flow.run
The on thing I thought it might be was failure to connect to the docker daemon (in case wsl ran it somewhere else) but I've confirmed that it's running at unix:///var/run/docker.sock (I think earlier versions of WSL had an issue but I don't think that's what's going on).
What else could I try? any suggestions?
prefect diagnostics
output:
{
"config_overrides": {
"cloud": {
"agent": {
"auth_token": true
}
},
"context": {
"secrets": false
}
},
"env_vars": [
"PREFECT__CLOUD__AGENT__AUTH_TOKEN"
],
"system_information": {
"platform": "Linux-4.19.104-microsoft-standard-x86_64-with-glibc2.29",
"prefect_version": "0.13.4",
"python_version": "3.8.1"
}
}
Minakshi
08/31/2020, 4:31 AMfor dataset in dataset_config['datasets']:
print('starting flow for dataset' + dataset['dataset_name'])
flow.run(dataset=dataset['dataset_name']) # runs this flow on its schedule
Eric
08/31/2020, 8:17 AMScott Zelenka
08/31/2020, 1:54 PM@task
decorator. So it gets stuck in a perpetual loop, because the server wants the client to wait longer than what was specified in the @task
decorator.
The Task Concurrency Limiting feature would reduce the frequency this happens, but would not catch all 429 exceptions. Sometimes this specific server gets overloaded by other traffic, and will dynamically rate-limit all traffic until it has scaled up to handle the additional traffic.
I'm guessing that I'd need to write some custom retry logic within the task
to handle the 429 exceptions, but curious if anyone else has a way to pipe the Retry-After
from a 429 into the prefect engine's retry_delay
parameter for similar rate-limited API calls?Marwan Sarieddine
08/31/2020, 2:21 PMJonas Hanfland
08/31/2020, 3:00 PMsome_task.map(df.groupby("id"))
But it gives me: KeyError: 'Column not found: 0'
Does anyone know if mapping over groups is supported and if yes, how? Thanks in advanceVikram Iyer
08/31/2020, 3:06 PMad-agent_1 | File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 941, in raise_for_status
ad-agent_1 | raise HTTPError(http_error_msg, response=self)
ad-agent_1 | requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: <http://ad-prefect-apollo:4200/>
The env variable on the agent
container is:
PREFECT__SERVER__ENDPOINT: <http://ad-prefect-apollo:4200>
I am running all the required services inside docker containers using docker-compose file.
Can anyone check and help out?Chris Goddard
09/01/2020, 1:40 AMLocalDaskExecutor()
to the flow.run(..
method but when I pass it to LocalEnvironment(executor=LocalDaskExecutor())
and then pass that to the flow constructor like this:
with Flow("test", schedule=schedule, environment=LocalEnvironment(executor=LocalDaskExecutor())) as flow:
I only get one task running at once.
Here's my test code:
@task
def generate():
return [x for x in range(0,40)]
@task
def log_sleep(x):
logger = prefect.context.get('logger')
time.sleep(5 + x)
<http://logger.info|logger.info>(x)
return x * x
@task
def collect(lst):
logger = prefect.context.get('logger')
<http://logger.info|logger.info>(lst)
schedule = Schedule(clocks=[DatesClock([pendulum.now() + timedelta(seconds=5)])])
with Flow("test", schedule=schedule, environment=LocalEnvironment(executor=LocalDaskExecutor())) as flow:
nums = generate()
results = log_sleep.map(nums)
x = collect(results)
flow.run()
Eric
09/01/2020, 3:50 AMSachit Shivam
09/01/2020, 10:46 AMSachit Shivam
09/01/2020, 11:18 AMfrom prefect import task, Flow, Parameter
@task(log_stdout=True)
def say_hello(name):
print("Hello, {}!".format(name))
with Flow("My First Flow") as flow:
name = Parameter('name')
say_hello(name)
flow.register('Test')
I've created a project named "Test" on the Prefect UI and I've set the parameter for "name" as "test"
However, the flow keeps failing:
I'm unable to figure out what I'm doing wrong
Can anyone point me in the right direction?Sachit Shivam
09/01/2020, 11:21 AMCaleb Moses
09/01/2020, 12:59 PMfunc_task
doesn't actually produce a file target so I'm not sure how to use it.Caleb Moses
09/01/2020, 1:03 PM# Here's an example of what I'm thinking
corpus_file = 'corpus.txt'
@task(target=corpus_file, result=LocalResult())
def write_sentence_corpus(sentences):
with open(corpus_file, 'w') as fp:
for sent in sentences:
fp.write(sent + '\n')
with Flow("text_model") as f:
sentences = extract_sentences()
corpus = write_sentence_corpus(sentences)
...
# Do more modelling
Caleb Moses
09/01/2020, 1:09 PMSaranya Elumalai
09/01/2020, 2:00 PMstart = Parameter("start")
start.set_downstream(s3_connection, flow=flow)
s3_connection.set_downstream(lin_of_bus, flow=flow)
s3_connection.set_downstream(company, flow=flow)
....
end_task = final_task()
end_task.set_upstream(lin_of_bus,flow=flow)
end_task.set_upstream(company, flow=flow)
flow.run(parameters={"start": "hello"}, executor=DaskExecutor())
Below schematic diagram shows the tasks are executed sequentially and not parallelPhilip Bennett
09/01/2020, 2:04 PMprefect agent install kubernetes --api "<http://10.154.15.199:4200>" --rbac --resource-manager --env DB_HOST=10.104.145.11 --env CRUNCHBASE_API_KEY=XYZ | kubectl apply -f -
I try to access them using os.getenv()
but cannot access them.
Has anyone had similar issues?